Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this,
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import abc
import errno
import os
import re
import shutil
import subprocess
import uuid
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import (
Dict,
Iterator,
List,
Optional,
Union,
)
from mach.util import to_optional_path
from mozfile import which
from mozpack.files import FileListFinder
class MissingVCSTool(Exception):
"""Represents a failure to find a version control tool binary."""
class MissingVCSInfo(Exception):
"""Represents a general failure to resolve a VCS interface."""
class MissingConfigureInfo(MissingVCSInfo):
"""Represents error finding VCS info from configure data."""
class MissingVCSExtension(MissingVCSInfo):
"""Represents error finding a required VCS extension."""
def __init__(self, ext):
self.ext = ext
msg = "Could not detect required extension '{}'".format(self.ext)
super(MissingVCSExtension, self).__init__(msg)
class InvalidRepoPath(Exception):
"""Represents a failure to find a VCS repo at a specified path."""
class MissingUpstreamRepo(Exception):
"""Represents a failure to automatically detect an upstream repo."""
class CannotDeleteFromRootOfRepositoryException(Exception):
"""Represents that the code attempted to delete all files from the root of
the repository, which is not permitted."""
def get_tool_path(tool: Optional[Union[str, Path]] = None):
tool = Path(tool)
"""Obtain the path of `tool`."""
if tool.is_absolute() and tool.exists():
return str(tool)
path = to_optional_path(which(str(tool)))
if not path:
raise MissingVCSTool(
f"Unable to obtain {tool} path. Try running "
"|mach bootstrap| to ensure your environment is up to "
"date."
)
return str(path)
class Repository(object):
"""A class wrapping utility methods around version control repositories.
This class is abstract and never instantiated. Obtain an instance by
calling a ``get_repository_*()`` helper function.
Clients are recommended to use the object as a context manager. But not
all methods require this.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, path: Path, tool: Optional[str] = None):
self.path = str(path.resolve())
self._tool = Path(get_tool_path(tool)) if tool else None
self._version = None
self._valid_diff_filter = ("m", "a", "d")
self._env = os.environ.copy()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
pass
def _run(self, *args, encoding="utf-8", **runargs):
return_codes = runargs.get("return_codes", [])
cmd = (str(self._tool),) + args
# Check if we have a tool, either hg or git. If this is a
# source release we return src, then we dont have a tool to use.
# This caused jstests to fail before fixing, because it uses a
# packaged mozjs release source
if not self._tool:
return "src"
else:
try:
return subprocess.check_output(
cmd,
cwd=self.path,
env=self._env,
encoding=encoding,
)
except subprocess.CalledProcessError as e:
if e.returncode in return_codes:
return ""
raise
@property
def tool_version(self):
"""Return the version of the VCS tool in use as a string."""
if self._version:
return self._version
info = self._run("--version").strip()
match = re.search(r"version ([^+)]+)", info)
if not match:
raise Exception("Unable to identify tool version.")
self.version = match.group(1)
return self.version
@property
def has_git_cinnabar(self):
"""True if the repository is using git cinnabar."""
return False
@abc.abstractproperty
def name(self):
"""Name of the tool."""
@abc.abstractproperty
def head_ref(self):
"""Hash of HEAD revision."""
@abc.abstractproperty
def base_ref(self):
"""Hash of revision the current topic branch is based on."""
@abc.abstractmethod
def base_ref_as_hg(self):
"""Mercurial hash of revision the current topic branch is based on.
Return None if the hg hash of the base ref could not be calculated.
"""
@abc.abstractproperty
def branch(self):
"""Current branch or bookmark the checkout has active."""
@abc.abstractmethod
def get_commit_time(self):
"""Return the Unix time of the HEAD revision."""
@abc.abstractmethod
def sparse_checkout_present(self):
"""Whether the working directory is using a sparse checkout.
A sparse checkout is defined as a working directory that only
materializes a subset of files in a given revision.
Returns a bool.
"""
@abc.abstractmethod
def get_user_email(self):
"""Return the user's email address.
If no email is configured, then None is returned.
"""
@abc.abstractmethod
def get_changed_files(self, diff_filter, mode="unstaged", rev=None):
"""Return a list of files that are changed in this repository's
working copy.
``diff_filter`` controls which kinds of modifications are returned.
It is a string which may only contain the following characters:
A - Include files that were added
D - Include files that were deleted
M - Include files that were modified
By default, all three will be included.
``mode`` can be one of 'unstaged', 'staged' or 'all'. Only has an
effect on git. Defaults to 'unstaged'.
``rev`` is a specifier for which changesets to consider for
changes. The exact meaning depends on the vcs system being used.
"""
@abc.abstractmethod
def get_outgoing_files(self, diff_filter, upstream):
"""Return a list of changed files compared to upstream.
``diff_filter`` works the same as `get_changed_files`.
``upstream`` is a remote ref to compare against. If unspecified,
this will be determined automatically. If there is no remote ref,
a MissingUpstreamRepo exception will be raised.
"""
@abc.abstractmethod
def add_remove_files(self, *paths: Union[str, Path]):
"""Add and remove files under `paths` in this repository's working copy."""
@abc.abstractmethod
def forget_add_remove_files(self, *paths: Union[str, Path]):
"""Undo the effects of a previous add_remove_files call for `paths`."""
@abc.abstractmethod
def get_tracked_files_finder(self, path=None):
"""Obtain a mozpack.files.BaseFinder of managed files in the working
directory.
The Finder will have its list of all files in the repo cached for its
entire lifetime, so operations on the Finder will not track with, for
example, commits to the repo during the Finder's lifetime.
"""
@abc.abstractmethod
def get_ignored_files_finder(self):
"""Obtain a mozpack.files.BaseFinder of ignored files in the working
directory.
The Finder will have its list of all files in the repo cached for its
entire lifetime, so operations on the Finder will not track with, for
example, changes to the repo during the Finder's lifetime.
"""
@abc.abstractmethod
def working_directory_clean(self, untracked=False, ignored=False):
"""Determine if the working directory is free of modifications.
Returns True if the working directory does not have any file
modifications. False otherwise.
By default, untracked and ignored files are not considered. If
``untracked`` or ``ignored`` are set, they influence the clean check
to factor these file classes into consideration.
"""
@abc.abstractmethod
def clean_directory(self, path: Union[str, Path]):
"""Undo all changes (including removing new untracked files) in the
given `path`.
"""
@abc.abstractmethod
def push_to_try(
self,
message: str,
changed_files: Dict[str, str] = {},
allow_log_capture: bool = False,
):
"""Create a temporary commit, push it to try and clean it up
afterwards.
With mercurial, MissingVCSExtension will be raised if the `push-to-try`
extension is not installed. On git, MissingVCSExtension will be raised
if git cinnabar is not present.
`changed_files` is a dict of file paths and their contents, see
`stage_changes`.
If `allow_log_capture` is set to `True`, then the push-to-try will be run using
Popen instead of check_call so that the logs can be captured elsewhere.
"""
@abc.abstractmethod
def update(self, ref):
"""Update the working directory to the specified reference."""
def commit(self, message, author=None, date=None, paths=None):
"""Create a commit using the provided commit message. The author, date,
and files/paths to be included may also be optionally provided. The
message, author and date arguments must be strings, and are passed as-is
to the commit command. Multiline commit messages are supported. The
paths argument must be None or an array of strings that represents the
set of files and folders to include in the commit.
"""
args = ["commit", "-m", message]
if author is not None:
if isinstance(self, HgRepository):
args = args + ["--user", author]
elif isinstance(self, GitRepository):
args = args + ["--author", author]
else:
raise MissingVCSInfo("Unknown repo type")
if date is not None:
args = args + ["--date", date]
if paths is not None:
args = args + paths
self._run(*args)
def _push_to_try_with_log_capture(self, cmd, subprocess_opts):
"""Push to try but with the ability for the user to capture logs.
We need to use Popen for this because neither the run method nor
check_call will allow us to reasonably catch the logs. With check_call,
hg hangs, and with the run method, the logs are output too slowly
so you're left wondering if it's working (prime candidate for
corrupting local repos).
"""
process = subprocess.Popen(cmd, **subprocess_opts)
# Print out the lines as they appear so they can be
# parsed for information
for line in process.stdout or []:
print(line)
process.stdout.close()
process.wait()
if process.returncode != 0:
for line in process.stderr or []:
print(line)
raise subprocess.CalledProcessError(
returncode=process.returncode,
cmd=cmd,
output="Failed to push-to-try",
stderr=process.stderr,
)
@abc.abstractmethod
def get_branch_nodes(self, head: Optional[str] = None) -> List[str]:
"""Return a list of commit SHAs for nodes on the current branch."""
@abc.abstractmethod
def get_commit_patches(self, nodes: str) -> List[bytes]:
"""Return the contents of the patch `node` in the VCS's standard format."""
@contextmanager
@abc.abstractmethod
def try_commit(
self, commit_message: str, changed_files: Optional[Dict[str, str]] = None
):
"""Create a temporary try commit as a context manager.
Create a new commit using `commit_message` as the commit message. The commit
may be empty, for example when only including try syntax.
`changed_files` may contain a dict of file paths and their contents,
see `stage_changes`.
"""
def stage_changes(self, changed_files: Dict[str, str]):
"""Stage a set of file changes
`changed_files` is a dict that contains the paths of files to change or
create as keys and their respective contents as values.
"""
paths = []
for path, content in changed_files.items():
full_path = Path(self.path) / path
full_path.parent.mkdir(parents=True, exist_ok=True)
with full_path.open("w") as fh:
fh.write(content)
paths.append(full_path)
if paths:
self.add_remove_files(*paths)
@abc.abstractmethod
def get_last_modified_time_for_file(self, path: Path):
"""Return last modified in VCS time for the specified file."""
pass
class HgRepository(Repository):
"""An implementation of `Repository` for Mercurial repositories."""
def __init__(self, path: Path, hg="hg"):
import hglib.client
super(HgRepository, self).__init__(path, tool=hg)
self._env["HGPLAIN"] = "1"
# Setting this modifies a global variable and makes all future hglib
# instances use this binary. Since the tool path was validated, this
# should be OK. But ideally hglib would offer an API that defines
# per-instance binaries.
hglib.HGPATH = str(self._tool)
# Without connect=False this spawns a persistent process. We want
# the process lifetime tied to a context manager.
self._client = hglib.client.hgclient(
self.path, encoding="UTF-8", configs=None, connect=False
)
@property
def name(self):
return "hg"
@property
def head_ref(self):
return self._run("log", "-r", ".", "-T", "{node}")
@property
def base_ref(self):
return self._run("log", "-r", "last(ancestors(.) and public())", "-T", "{node}")
def base_ref_as_hg(self):
return self.base_ref
@property
def branch(self):
bookmarks_fn = Path(self.path) / ".hg" / "bookmarks.current"
if bookmarks_fn.exists():
with open(bookmarks_fn) as f:
bookmark = f.read()
return bookmark or None
return None
def __enter__(self):
if self._client.server is None:
# The cwd if the spawned process should be the repo root to ensure
# relative paths are normalized to it.
old_cwd = Path.cwd()
try:
os.chdir(self.path)
self._client.open()
finally:
os.chdir(old_cwd)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._client.close()
def _run(self, *args, **runargs):
if not self._client.server:
return super(HgRepository, self)._run(*args, **runargs)
# hglib requires bytes on python 3
args = [a.encode("utf-8") if not isinstance(a, bytes) else a for a in args]
return self._client.rawcommand(args).decode("utf-8")
def get_commit_time(self):
newest_public_revision_time = self._run(
"log",
"--rev",
"heads(ancestors(.) and not draft())",
"--template",
"{word(0, date|hgdate)}",
"--limit",
"1",
).strip()
if not newest_public_revision_time:
raise RuntimeError(
"Unable to find a non-draft commit in this hg "
"repository. If you created this repository from a "
'bundle, have you done a "hg pull" from hg.mozilla.org '
"since?"
)
return int(newest_public_revision_time)
def sparse_checkout_present(self):
# We assume a sparse checkout is enabled if the .hg/sparse file
# has data. Strictly speaking, we should look for a requirement in
# .hg/requires. But since the requirement is still experimental
# as of Mercurial 4.3, it's probably more trouble than its worth
# to verify it.
sparse = Path(self.path) / ".hg" / "sparse"
try:
st = sparse.stat()
return st.st_size > 0
except OSError as e:
if e.errno != errno.ENOENT:
raise
return False
def get_user_email(self):
# Output is in the form "First Last <flast@mozilla.com>"
username = self._run("config", "ui.username", return_codes=[0, 1])
if not username:
# No username is set
return None
match = re.search(r"<(.*)>", username)
if not match:
# "ui.username" doesn't follow the "Full Name <email@domain>" convention
return None
return match.group(1)
def _format_diff_filter(self, diff_filter, for_status=False):
df = diff_filter.lower()
assert all(f in self._valid_diff_filter for f in df)
# When looking at the changes in the working directory, the hg status
# command uses 'd' for files that have been deleted with a non-hg
# command, and 'r' for files that have been `hg rm`ed. Use both.
return df.replace("d", "dr") if for_status else df
def _files_template(self, diff_filter):
template = ""
df = self._format_diff_filter(diff_filter)
if "a" in df:
template += "{file_adds % '{file}\\n'}"
if "d" in df:
template += "{file_dels % '{file}\\n'}"
if "m" in df:
template += "{file_mods % '{file}\\n'}"
return template
def get_changed_files(self, diff_filter="ADM", mode="unstaged", rev=None):
if rev is None:
# Use --no-status to print just the filename.
df = self._format_diff_filter(diff_filter, for_status=True)
return self._run("status", "--no-status", "-{}".format(df)).splitlines()
else:
template = self._files_template(diff_filter)
return self._run("log", "-r", rev, "-T", template).splitlines()
def get_outgoing_files(self, diff_filter="ADM", upstream=None):
template = self._files_template(diff_filter)
if not upstream:
return self._run(
"log", "-r", "draft() and ancestors(.)", "--template", template
).split()
return self._run(
"outgoing",
"-r",
".",
"--quiet",
"--template",
template,
upstream,
return_codes=(1,),
).split()
def add_remove_files(self, *paths: Union[str, Path]):
if not paths:
return
paths = [str(path) for path in paths]
args = ["addremove"] + paths
m = re.search(r"\d+\.\d+", self.tool_version)
simplified_version = float(m.group(0)) if m else 0
if simplified_version >= 3.9:
args = ["--config", "extensions.automv="] + args
self._run(*args)
def forget_add_remove_files(self, *paths: Union[str, Path]):
if not paths:
return
paths = [str(path) for path in paths]
self._run("forget", *paths)
def get_tracked_files_finder(self, path=None):
# Can return backslashes on Windows. Normalize to forward slashes.
files = list(
p.replace("\\", "/") for p in self._run("files", "-0").split("\0") if p
)
return FileListFinder(files)
def get_ignored_files_finder(self):
# Can return backslashes on Windows. Normalize to forward slashes.
files = list(
p.replace("\\", "/").split(" ")[-1]
for p in self._run("status", "-i").split("\n")
if p
)
return FileListFinder(files)
def working_directory_clean(self, untracked=False, ignored=False):
args = ["status", "--modified", "--added", "--removed", "--deleted"]
if untracked:
args.append("--unknown")
if ignored:
args.append("--ignored")
# If output is empty, there are no entries of requested status, which
# means we are clean.
return not len(self._run(*args).strip())
def clean_directory(self, path: Union[str, Path]):
if Path(self.path).samefile(path):
raise CannotDeleteFromRootOfRepositoryException()
self._run("revert", str(path))
for single_path in self._run("st", "-un", str(path)).splitlines():
single_path = Path(single_path)
if single_path.is_file():
single_path.unlink()
else:
shutil.rmtree(str(single_path))
def update(self, ref):
return self._run("update", "--check", ref)
def raise_for_missing_extension(self, extension: str):
"""Raise `MissingVCSExtension` if `extension` is not installed and enabled."""
try:
self._run("showconfig", f"extensions.{extension}")
except subprocess.CalledProcessError:
raise MissingVCSExtension(extension)
def push_to_try(
self,
message: str,
changed_files: Dict[str, str] = {},
allow_log_capture: bool = False,
):
if changed_files:
self.stage_changes(changed_files)
try:
cmd = (str(self._tool), "push-to-try", "-m", message)
if allow_log_capture:
self._push_to_try_with_log_capture(
cmd,
{
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"cwd": self.path,
"env": self._env,
"universal_newlines": True,
"bufsize": 1,
},
)
else:
subprocess.check_call(
cmd,
cwd=self.path,
env=self._env,
)
except subprocess.CalledProcessError:
self.raise_for_missing_extension("push-to-try")
raise
finally:
self._run("revert", "-a")
def get_branch_nodes(
self, head: Optional[str] = None, base_ref: Optional[str] = None
) -> List[str]:
"""Return a list of commit SHAs for nodes on the current branch."""
if not base_ref:
base_ref = self.base_ref
head_ref = head or self.head_ref
return self._run(
"log",
"-r",
f"{base_ref}::{head_ref} and not {base_ref}",
"-T",
"{node}\n",
).splitlines()
def get_commit_patches(self, nodes: List[str]) -> List[bytes]:
"""Return the contents of the patch `node` in the VCS' standard format."""
# Running `hg export` once for each commit in a large stack is
# slow, so instead we run it once and parse the output for each
# individual patch.
args = ["export"]
for node in nodes:
args.extend(("-r", node))
output = self._run(*args).encode("utf-8")
patches = []
current_patch = []
for i, line in enumerate(output.splitlines()):
if i != 0 and line == b"# HG changeset patch":
# When we see the first line of a new patch, add the patch we have been
# building to the patches list and start building a new patch.
patches.append(b"\n".join(current_patch))
current_patch = [line]
else:
# Add a new line to the patch being built.
current_patch.append(line)
# Add the last patch to the stack.
patches.append(b"\n".join(current_patch))
return patches
@contextmanager
def try_commit(
self, commit_message: str, changed_files: Optional[Dict[str, str]] = None
):
"""Create a temporary try commit as a context manager.
Create a new commit using `commit_message` as the commit message. The commit
may be empty, for example when only including try syntax.
`changed_files` may contain a dict of file paths and their contents,
see `stage_changes`.
"""
if changed_files:
self.stage_changes(changed_files)
# Allow empty commit messages in case we only use try-syntax.
self._run("--config", "ui.allowemptycommit=1", "commit", "-m", commit_message)
yield self.head_ref
try:
self._run("prune", ".")
except subprocess.CalledProcessError:
# The `evolve` extension is required for `uncommit` and `prune`.
self.raise_for_missing_extension("evolve")
raise
def get_last_modified_time_for_file(self, path: Path):
"""Return last modified in VCS time for the specified file."""
out = self._run(
"log",
"--template",
"{date|isodatesec}",
"--limit",
"1",
"--follow",
str(path),
)
return datetime.strptime(out.strip(), "%Y-%m-%d %H:%M:%S %z")
class GitRepository(Repository):
"""An implementation of `Repository` for Git repositories."""
def __init__(self, path: Path, git="git"):
super(GitRepository, self).__init__(path, tool=git)
@property
def name(self):
return "git"
@property
def head_ref(self):
return self._run("rev-parse", "HEAD").strip()
def get_mozilla_upstream_remotes(self) -> Iterator[str]:
"""Return the Mozilla-official upstream remotes for this repo."""
out = self._run("remote", "-v")
if not out:
return
remotes = out.splitlines()
if not remotes:
return
for line in remotes:
name, url, action = line.split()
# Only consider fetch sources.
if action != "(fetch)":
continue
# Return any `hg.mozilla.org` remotes, ignoring `try`.
if "hg.mozilla.org" in url and not url.endswith("hg.mozilla.org/try"):
yield name
def get_mozilla_remote_args(self) -> List[str]:
"""Return a list of `--remotes` arguments to limit commits to official remotes."""
official_remotes = [
f"--remotes={remote}" for remote in self.get_mozilla_upstream_remotes()
]
return official_remotes if official_remotes else ["--remotes"]
@property
def base_ref(self):
remote_args = self.get_mozilla_remote_args()
refs = self._run(
"rev-list", "HEAD", "--topo-order", "--boundary", "--not", *remote_args
).splitlines()
if refs:
return refs[-1][1:] # boundary starts with a prefix `-`
return self.head_ref
def base_ref_as_hg(self):
base_ref = self.base_ref
try:
return self._run("cinnabar", "git2hg", base_ref).strip()
except subprocess.CalledProcessError:
return
@property
def branch(self):
# This mimics `git branch --show-current` for older versions of git.
branch = self._run("symbolic-ref", "-q", "HEAD", return_codes=[0, 1]).strip()
if not branch.startswith("refs/heads/"):
return None
return branch[len("refs/heads/") :]
@property
def has_git_cinnabar(self):
try:
self._run("cinnabar", "--version")
except subprocess.CalledProcessError:
return False
return True
def get_commit_time(self):
return int(self._run("log", "-1", "--format=%ct").strip())
def sparse_checkout_present(self):
# Not yet implemented.
return False
def get_user_email(self):
email = self._run("config", "user.email", return_codes=[0, 1])
if not email:
return None
return email.strip()
def get_changed_files(self, diff_filter="ADM", mode="unstaged", rev=None):
assert all(f.lower() in self._valid_diff_filter for f in diff_filter)
if rev is None:
cmd = ["diff"]
if mode == "staged":
cmd.append("--cached")
elif mode == "all":
cmd.append("HEAD")
else:
cmd = ["diff-tree", "-r", "--no-commit-id", rev]
cmd.append("--name-only")
cmd.append("--diff-filter=" + diff_filter.upper())
return self._run(*cmd).splitlines()
def get_outgoing_files(self, diff_filter="ADM", upstream=None):
assert all(f.lower() in self._valid_diff_filter for f in diff_filter)
not_condition = upstream if upstream else "--remotes"
files = self._run(
"log",
"--name-only",
"--diff-filter={}".format(diff_filter.upper()),
"--oneline",
"--pretty=format:",
"HEAD",
"--not",
not_condition,
).splitlines()
return [f for f in files if f]
def add_remove_files(self, *paths: Union[str, Path]):
if not paths:
return
paths = [str(path) for path in paths]
self._run("add", *paths)
def forget_add_remove_files(self, *paths: Union[str, Path]):
if not paths:
return
paths = [str(path) for path in paths]
self._run("reset", *paths)
def get_tracked_files_finder(self, path=None):
files = [p for p in self._run("ls-files", "-z").split("\0") if p]
return FileListFinder(files)
def get_ignored_files_finder(self):
files = [
p
for p in self._run(
"ls-files", "-i", "-o", "-z", "--exclude-standard"
).split("\0")
if p
]
return FileListFinder(files)
def working_directory_clean(self, untracked=False, ignored=False):
args = ["status", "--porcelain"]
# Even in --porcelain mode, behavior is affected by the
# ``status.showUntrackedFiles`` option, which means we need to be
# explicit about how to treat untracked files.
if untracked:
args.append("--untracked-files=all")
else:
args.append("--untracked-files=no")
if ignored:
args.append("--ignored")
return not len(self._run(*args).strip())
def clean_directory(self, path: Union[str, Path]):
if Path(self.path).samefile(path):
raise CannotDeleteFromRootOfRepositoryException()
self._run("checkout", "--", str(path))
self._run("clean", "-df", str(path))
def update(self, ref):
self._run("checkout", ref)
def push_to_try(
self,
message: str,
changed_files: Dict[str, str] = {},
allow_log_capture: bool = False,
):
if not self.has_git_cinnabar:
raise MissingVCSExtension("cinnabar")
with self.try_commit(message, changed_files) as head:
cmd = (
str(self._tool),
"-c",
# Never store git-cinnabar metadata for pushes to try.
# Normally git-cinnabar asks the server what the phase of what it pushed
# is, and figures on its own, but that request takes a long time on try.
"cinnabar.data=never",
"push",
f"+{head}:refs/heads/branches/default/tip",
)
if allow_log_capture:
self._push_to_try_with_log_capture(
cmd,
{
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"cwd": self.path,
"universal_newlines": True,
"bufsize": 1,
},
)
else:
subprocess.check_call(cmd, cwd=self.path)
def set_config(self, name, value):
self._run("config", name, value)
def get_branch_nodes(self, head: Optional[str] = None) -> List[str]:
"""Return a list of commit SHAs for nodes on the current branch."""
remote_args = self.get_mozilla_remote_args()
return self._run(
"log",
head or "HEAD",
"--reverse",
"--not",
*remote_args,
"--pretty=%H",
).splitlines()
def get_commit_patches(self, nodes: List[str]) -> List[bytes]:
"""Return the contents of the patch `node` in the VCS' standard format."""
return [
self._run("format-patch", node, "-1", "--always", "--stdout").encode(
"utf-8"
)
for node in nodes
]
@contextmanager
def try_commit(
self, commit_message: str, changed_files: Optional[Dict[str, str]] = None
):
"""Create a temporary try commit as a context manager.
Create a new commit using `commit_message` as the commit message. The commit
may be empty, for example when only including try syntax.
`changed_files` may contain a dict of file paths and their contents,
see `stage_changes`.
"""
current_head = self.head_ref
def data(content):
return f"data {len(content)}\n{content}"
author = self._run("var", "GIT_AUTHOR_IDENT").strip()
committer = self._run("var", "GIT_COMMITTER_IDENT").strip()
# A random enough temporary branch name that shouldn't conflict with
# anything else, even in the machtry namespace.
branch = str(uuid.uuid4())
# The following fast-import script creates a new commit on a temporary
# branch that it deletes at the end, based off the current HEAD, and
# adding or modifying the files from `changed_files`.
# fast-import will output the sha1 for that temporary commit on stdout
# (via `get-mark`).
fast_import = "\n".join(
[
f"commit refs/machtry/{branch}",
"mark :1",
f"author {author}",
f"committer {committer}",
data(commit_message),
f"from {current_head}",
"\n".join(
f"M 100644 inline {path}\n{data(content)}"
for path, content in (changed_files or {}).items()
),
f"reset refs/machtry/{branch}",
"from 0000000000000000000000000000000000000000",
"get-mark :1",
"",
]
)
cmd = (str(self._tool), "fast-import", "--quiet")
stdout = subprocess.check_output(
cmd,
cwd=self.path,
env=self._env,
# text=True changes line endings on Windows, and git fast-import
# doesn't like \r\n.
input=fast_import.encode("utf-8"),
)
try_head = stdout.decode("ascii").strip()
yield try_head
# Keep trace of the temporary push in the reflog, as if we did actually commit.
# This does update HEAD for a small window of time.
# If we raced with something else that changed the HEAD after we created our
# commit, update-ref will fail and print an error message. Only the update in
# the reflog would be lost in this case.
self._run("update-ref", "-m", "mach try: push", "HEAD", try_head, current_head)
# Likewise, if we raced with something else that updated the HEAD between our
# two update-ref, update-ref will fail and print an error message.
self._run(
"update-ref",
"-m",
"mach try: restore",
"HEAD",
current_head,
try_head,
)
def get_last_modified_time_for_file(self, path: Path):
"""Return last modified in VCS time for the specified file."""
out = self._run("log", "-1", "--format=%ad", "--date=iso", path)
return datetime.strptime(out.strip(), "%Y-%m-%d %H:%M:%S %z")
class SrcRepository(Repository):
"""An implementation of `Repository` for Git repositories."""
def __init__(self, path: Path, src="src"):
super(SrcRepository, self).__init__(path, tool=None)
@property
def name(self):
return "src"
@property
def head_ref(self):
pass
@property
def base_ref(self):
pass
def base_ref_as_hg(self):
pass
@property
def branch(self):
pass
@property
def has_git_cinnabar(self):
pass
def get_commit_time(self):
pass
def sparse_checkout_present(self):
pass
def get_user_email(self):
pass
def get_upstream(self):
pass
def get_changed_files(self, diff_filter="ADM", mode="unstaged", rev=None):
return []
def get_outgoing_files(self, diff_filter="ADM", upstream=None):
return []
def add_remove_files(self, *paths: Union[str, Path]):
pass
def forget_add_remove_files(self, *paths: Union[str, Path]):
pass
def git_ignore(self, path):
"""This function reads the mozilla-central/.gitignore file and creates a
list of the patterns to ignore
"""
ignore = []
f = open(path + "/.gitignore", "r")
while True:
line = f.readline()
if not line:
break
if line.startswith("#"):
pass
elif line.strip() and line not in ["\r", "\r\n"]:
ignore.append(line.strip().lstrip("/"))
f.close()
return ignore
def get_files(self, path):
"""This function gets all files in your source folder e.g mozilla-central
and creates a list of that
"""
res = []
# move away the .git or .hg folder from path to more easily test in a hg/git repo
for root, dirs, files in os.walk(self.path):
base = os.path.relpath(root, self.path)
for name in files:
res.append(os.path.join(base, name))
return res
def get_tracked_files_finder(self, path):
"""Get files, similar to 'hg files -0' or 'git ls-files -z', thats why
we read the .gitignore file for patterns to ignore.
Speed could probably be improved.
"""
import fnmatch
files = list(
p.replace("\\", "/").replace("./", "") for p in self.get_files(path) if p
)
files.sort()
ig = self.git_ignore(path)
mat = []
for i in ig:
x = fnmatch.filter(files, i)
if x:
mat = mat + x
match = list(set(files) - set(mat))
match.sort()
if len(match) == 0:
return None
else:
return FileListFinder(match)
def working_directory_clean(self, untracked=False, ignored=False):
pass
def clean_directory(self, path: Union[str, Path]):
pass
def update(self, ref):
pass
def push_to_try(
self,
message: str,
changed_files: Dict[str, str] = {},
allow_log_capture: bool = False,
):
pass
def set_config(self, name, value):
pass
def get_last_modified_time_for_file(self, path: Path):
"""Return last modified in VCS time for the specified file."""
raise MissingVCSTool
def get_repository_object(
path: Optional[Union[str, Path]], hg="hg", git="git", src="src"
):
"""Get a repository object for the repository at `path`.
If `path` is not a known VCS repository, raise an exception.
"""
# If we provide a path to hg that does not match the on-disk casing (e.g.,
# because `path` was normcased), then the hg fsmonitor extension will call
# watchman with that path and watchman will spew errors.
path = Path(path).resolve()
if (path / ".hg").is_dir():
return HgRepository(path, hg=hg)
elif (path / ".git").exists():
return GitRepository(path, git=git)
elif (path / "config" / "milestone.txt").exists():
return SrcRepository(path, src=src)
else:
raise InvalidRepoPath(f"Unknown VCS, or not a source checkout: {path}")
def get_repository_from_build_config(config):
"""Obtain a repository from the build configuration.
Accepts an object that has a ``topsrcdir`` and ``subst`` attribute.
"""
flavor = config.substs.get("VCS_CHECKOUT_TYPE")
# If in build mode, only use what configure found. That way we ensure
# that everything in the build system can be controlled via configure.
if not flavor:
raise MissingConfigureInfo(
"could not find VCS_CHECKOUT_TYPE "
"in build config; check configure "
"output and verify it could find a "
"VCS binary"
)
if flavor == "hg":
return HgRepository(Path(config.topsrcdir), hg=config.substs["HG"])
elif flavor == "git":
return GitRepository(Path(config.topsrcdir), git=config.substs["GIT"])
elif flavor == "src":
return SrcRepository(Path(config.topsrcdir), src=config.substs["SRC"])
else:
raise MissingVCSInfo("unknown VCS_CHECKOUT_TYPE value: %s" % flavor)
def get_repository_from_env():
"""Obtain a repository object by looking at the environment.
If inside a build environment (denoted by presence of a ``buildconfig``
module), VCS info is obtained from it, as found via configure. This allows
us to respect what was passed into configure. Otherwise, we fall back to
scanning the filesystem.
"""
try:
import buildconfig
return get_repository_from_build_config(buildconfig)
except (ImportError, MissingVCSTool):
pass
paths_to_check = [Path.cwd(), *Path.cwd().parents]
for path in paths_to_check:
try:
return get_repository_object(path)
except InvalidRepoPath:
continue
raise MissingVCSInfo(f"Could not find Mercurial or Git checkout for {Path.cwd()}")