Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import codecs
import fnmatch
import io
import json
import os
import shutil
import sys
import types
from io import StringIO
from .filters import DEFAULT_FILTERS, enabled, filterlist
from .filters import exists as _exists
from .ini import read_ini
from .logger import Logger
from .toml import read_toml
__all__ = ["ManifestParser", "TestManifest", "convert"]
relpath = os.path.relpath
# path normalization
def normalize_path(path):
"""normalize a relative path"""
if sys.platform.startswith("win"):
return path.replace("/", os.path.sep)
return path
def denormalize_path(path):
"""denormalize a relative path"""
if sys.platform.startswith("win"):
return path.replace(os.path.sep, "/")
return path
# objects for parsing manifests
class ManifestParser(object):
"""read .ini manifests"""
def __init__(
self,
manifests=(),
defaults=None,
strict=True,
rootdir=None,
finder=None,
handle_defaults=True,
use_toml=True,
document=False,
):
"""Creates a ManifestParser from the given manifest files.
:param manifests: An iterable of file paths or file objects corresponding
to manifests. If a file path refers to a manifest file that
does not exist, an IOError is raised.
:param defaults: Variables to pre-define in the environment for evaluating
expressions in manifests.
:param strict: If False, the provided manifests may contain references to
listed (test) files that do not exist without raising an
IOError during reading, and certain errors in manifests
are not considered fatal. Those errors include duplicate
section names, redefining variables, and defining empty
variables.
:param rootdir: The directory used as the basis for conversion to and from
relative paths during manifest reading.
:param finder: If provided, this finder object will be used for filesystem
interactions. Finder objects are part of the mozpack package,
documented at
:param handle_defaults: If not set, do not propagate manifest defaults to individual
test objects. Callers are expected to manage per-manifest
defaults themselves via the manifest_defaults member
variable in this case.
:param use_toml: If True *.toml configration files will be used iff present in the same location as *.ini files (applies to included files as well). If False only *.ini files will be considered. (defaults to True)
:param document: If True *.toml configration will preserve the parsed document from `tomlkit` in self.source_documents[filename] (defaults to False)
"""
self._defaults = defaults or {}
self.tests = []
self.manifest_defaults = {}
self.source_files = set()
self.source_documents = {} # source document for each filename (optional)
self.strict = strict
self.rootdir = rootdir
self._root = None
self.finder = finder
self._handle_defaults = handle_defaults
self.use_toml = use_toml
self.document = document
self.logger = Logger()
if manifests:
self.read(*manifests)
def path_exists(self, path):
if self.finder:
return self.finder.get(path) is not None
return os.path.exists(path)
@property
def root(self):
if not self._root:
if self.rootdir is None:
self._root = ""
else:
assert os.path.isabs(self.rootdir)
self._root = self.rootdir + os.path.sep
return self._root
def relative_to_root(self, path):
# Microoptimization, because relpath is quite expensive.
# We know that rootdir is an absolute path or empty. If path
# starts with rootdir, then path is also absolute and the tail
# of the path is the relative path (possibly non-normalized,
# when here is unknown).
# For this to work rootdir needs to be terminated with a path
# separator, so that references to sibling directories with
# a common prefix don't get misscomputed (e.g. /root and
# /rootbeer/file).
# When the rootdir is unknown, the relpath needs to be left
# unchanged. We use an empty string as rootdir in that case,
# which leaves relpath unchanged after slicing.
if path.startswith(self.root):
return path[len(self.root) :]
else:
return relpath(path, self.root)
# methods for reading manifests
def _get_fp_filename(self, filename):
# get directory of this file if not file-like object
if isinstance(filename, str):
# If we're using mercurial as our filesystem via a finder
# during manifest reading, the getcwd() calls that happen
# with abspath calls will not be meaningful, so absolute
# paths are required.
if self.finder:
assert os.path.isabs(filename)
filename = os.path.abspath(filename)
if self.finder:
fp = codecs.getreader("utf-8")(self.finder.get(filename).open())
else:
fp = io.open(filename, encoding="utf-8")
else:
fp = filename
if hasattr(fp, "name"):
filename = os.path.abspath(fp.name)
else:
filename = None
return fp, filename
def _read(self, root, filename, defaults, parentmanifest=None):
"""
Internal recursive method for reading and parsing manifests.
Stores all found tests in self.tests
:param root: The base path
:param filename: File object or string path for the base manifest file
:param defaults: Options that apply to all items
:param parentmanifest: Filename of the parent manifest, relative to rootdir (default None)
"""
def read_file(type):
include_file = section.split(type, 1)[-1]
include_file = normalize_path(include_file)
if not os.path.isabs(include_file):
include_file = os.path.join(here, include_file)
file_base, file_ext = os.path.splitext(include_file)
if file_ext == ".ini":
toml_name = file_base + ".toml"
if self.path_exists(toml_name):
if self.use_toml:
include_file = toml_name
else:
self.logger.debug_ci(
f"NOTE TOML include file present, but not used: {toml_name}"
)
elif file_ext != ".toml":
raise IOError(
f"manfestparser file extension not supported: {include_file}"
)
if not self.path_exists(include_file):
message = "Included file '%s' does not exist" % include_file
if self.strict:
raise IOError(message)
else:
sys.stderr.write("%s\n" % message)
return
return include_file
# assume we are reading an INI file
read_fn = read_ini
fp, filename = self._get_fp_filename(filename)
manifest_defaults_filename = filename # does not change if TOML is present
if filename is None:
filename_rel = None
here = root
file_base = file_ext = None
else:
self.source_files.add(filename)
filename_rel = self.relative_to_root(filename)
here = os.path.dirname(filename)
file_base, file_ext = os.path.splitext(filename)
if file_ext == ".ini":
toml_name = file_base + ".toml"
if self.path_exists(toml_name):
if self.use_toml:
fp, filename = self._get_fp_filename(toml_name)
read_fn = read_toml
else:
self.logger.debug_ci(
f"NOTE TOML present, but not used: {toml_name}"
)
elif file_ext == ".toml":
read_fn = read_toml
else:
raise IOError(f"manfestparser file extension not supported: {filename}")
defaults["here"] = here
# read the configuration
sections, defaults, document = read_fn(
fp=fp,
defaults=defaults,
strict=self.strict,
handle_defaults=self._handle_defaults,
document=self.document,
)
if filename is not None:
self.source_documents[filename] = document
if parentmanifest and filename:
# A manifest can be read multiple times, via "include:", optionally
# with section-specific variables. These variables only apply to
# the included manifest when included via the same parent manifest,
# so they must be associated with (parentmanifest, filename).
#
# |defaults| is a combination of variables, in the following order:
# - The defaults of the ancestor manifests if self._handle_defaults
# is True.
# - Any variables from the "[include:...]" section.
# - The defaults of the included manifest.
self.manifest_defaults[
(parentmanifest, manifest_defaults_filename)
] = defaults
if manifest_defaults_filename != filename:
self.manifest_defaults[(parentmanifest, filename)] = defaults
else:
self.manifest_defaults[manifest_defaults_filename] = defaults
if manifest_defaults_filename != filename:
self.manifest_defaults[filename] = defaults
# get the tests
for section, data in sections:
# a file to include
# TODO: keep track of included file structure:
# self.manifests = {'manifest.ini': 'relative/path.ini'}
if section.startswith("include:"):
include_file = read_file("include:")
if include_file:
include_defaults = data.copy()
self._read(
root,
include_file,
include_defaults,
parentmanifest=filename_rel,
)
continue
# otherwise an item
test = data.copy()
test["name"] = section
# Will be None if the manifest being read is a file-like object.
test["manifest"] = filename
test["manifest_relpath"] = None
if filename:
test["manifest_relpath"] = filename_rel
# determine the path
path = test.get("path", section)
_relpath = path
if "://" not in path: # don't futz with URLs
path = normalize_path(path)
if here and not os.path.isabs(path):
# Profiling indicates 25% of manifest parsing is spent
# in this call to normpath, but almost all calls return
# their argument unmodified, so we avoid the call if
# '..' if not present in the path.
path = os.path.join(here, path)
if ".." in path:
path = os.path.normpath(path)
_relpath = self.relative_to_root(path)
test["path"] = path
test["relpath"] = _relpath
if parentmanifest is not None:
# If a test was included by a parent manifest we may need to
# indicate that in the test object for the sake of identifying
# a test, particularly in the case a test file is included by
# multiple manifests.
test["ancestor_manifest"] = parentmanifest
# append the item
self.tests.append(test)
def read(self, *filenames, **defaults):
"""
read and add manifests from file paths or file-like objects
filenames -- file paths or file-like objects to read as manifests
defaults -- default variables
"""
# ensure all files exist
missing = [
filename
for filename in filenames
if isinstance(filename, str) and not self.path_exists(filename)
]
if missing:
raise IOError("Missing files: %s" % ", ".join(missing))
# default variables
_defaults = defaults.copy() or self._defaults.copy()
_defaults.setdefault("here", None)
# process each file
for filename in filenames:
# set the per file defaults
defaults = _defaults.copy()
here = None
if isinstance(filename, str):
here = os.path.dirname(os.path.abspath(filename))
elif hasattr(filename, "name"):
here = os.path.dirname(os.path.abspath(filename.name))
if here:
defaults["here"] = here # directory of master .ini file
if self.rootdir is None:
# set the root directory
# == the directory of the first manifest given
self.rootdir = here
self._read(here, filename, defaults)
# methods for querying manifests
def query(self, *checks, **kw):
"""
general query function for tests
- checks : callable conditions to test if the test fulfills the query
"""
tests = kw.get("tests", None)
if tests is None:
tests = self.tests
retval = []
for test in tests:
for check in checks:
if not check(test):
break
else:
retval.append(test)
return retval
def get(self, _key=None, inverse=False, tags=None, tests=None, **kwargs):
# TODO: pass a dict instead of kwargs since you might hav
# e.g. 'inverse' as a key in the dict
# TODO: tags should just be part of kwargs with None values
# (None == any is kinda weird, but probably still better)
# fix up tags
if tags:
tags = set(tags)
else:
tags = set()
# make some check functions
if inverse:
def has_tags(test):
return not tags.intersection(test.keys())
def dict_query(test):
for key, value in list(kwargs.items()):
if test.get(key) == value:
return False
return True
else:
def has_tags(test):
return tags.issubset(test.keys())
def dict_query(test):
for key, value in list(kwargs.items()):
if test.get(key) != value:
return False
return True
# query the tests
tests = self.query(has_tags, dict_query, tests=tests)
# if a key is given, return only a list of that key
# useful for keys like 'name' or 'path'
if _key:
return [test[_key] for test in tests]
# return the tests
return tests
def manifests(self, tests=None):
"""
return manifests in order in which they appear in the tests
If |tests| is not set, the order of the manifests is unspecified.
"""
if tests is None:
manifests = []
# Make sure to return all the manifests, even ones without tests.
for m in list(self.manifest_defaults.keys()):
if isinstance(m, tuple):
_parentmanifest, manifest = m
else:
manifest = m
if manifest not in manifests:
manifests.append(manifest)
return manifests
manifests = []
for test in tests:
manifest = test.get("manifest")
if not manifest:
continue
if manifest not in manifests:
manifests.append(manifest)
return manifests
def paths(self):
return [i["path"] for i in self.tests]
# methods for auditing
def missing(self, tests=None):
"""
return list of tests that do not exist on the filesystem
"""
if tests is None:
tests = self.tests
existing = list(_exists(tests, {}))
return [t for t in tests if t not in existing]
def check_missing(self, tests=None):
missing = self.missing(tests=tests)
if missing:
missing_paths = [test["path"] for test in missing]
if self.strict:
raise IOError(
"Strict mode enabled, test paths must exist. "
"The following test(s) are missing: %s"
% json.dumps(missing_paths, indent=2)
)
print(
"Warning: The following test(s) are missing: %s"
% json.dumps(missing_paths, indent=2),
file=sys.stderr,
)
return missing
def verifyDirectory(self, directories, pattern=None, extensions=None):
"""
checks what is on the filesystem vs what is in a manifest
returns a 2-tuple of sets:
(missing_from_filesystem, missing_from_manifest)
"""
files = set([])
if isinstance(directories, str):
directories = [directories]
# get files in directories
for directory in directories:
for dirpath, _dirnames, fnames in os.walk(directory, topdown=True):
filenames = fnames
# only add files that match a pattern
if pattern:
filenames = fnmatch.filter(filenames, pattern)
# only add files that have one of the extensions
if extensions:
filenames = [
filename
for filename in filenames
if os.path.splitext(filename)[-1] in extensions
]
files.update(
[os.path.join(dirpath, filename) for filename in filenames]
)
paths = set(self.paths())
missing_from_filesystem = paths.difference(files)
missing_from_manifest = files.difference(paths)
return (missing_from_filesystem, missing_from_manifest)
# methods for output
def write(
self,
fp=sys.stdout,
rootdir=None,
global_tags=None,
global_kwargs=None,
local_tags=None,
local_kwargs=None,
):
"""
write a manifest given a query
global and local options will be munged to do the query
globals will be written to the top of the file
locals (if given) will be written per test
"""
# open file if `fp` given as string
close = False
if isinstance(fp, str):
fp = open(fp, "w")
close = True
# root directory
if rootdir is None:
rootdir = self.rootdir
# sanitize input
global_tags = global_tags or set()
local_tags = local_tags or set()
global_kwargs = global_kwargs or {}
local_kwargs = local_kwargs or {}
# create the query
tags = set([])
tags.update(global_tags)
tags.update(local_tags)
kwargs = {}
kwargs.update(global_kwargs)
kwargs.update(local_kwargs)
# get matching tests
tests = self.get(tags=tags, **kwargs)
# print the .ini manifest
if global_tags or global_kwargs:
print("[DEFAULT]", file=fp)
for tag in global_tags:
print("%s =" % tag, file=fp)
for key, value in list(global_kwargs.items()):
print("%s = %s" % (key, value), file=fp)
print(file=fp)
for t in tests:
test = t.copy() # don't overwrite
path = test["name"]
if not os.path.isabs(path):
path = test["path"]
if self.rootdir:
path = relpath(test["path"], self.rootdir)
path = denormalize_path(path)
print("[%s]" % path, file=fp)
# reserved keywords:
reserved = [
"path",
"name",
"here",
"manifest",
"manifest_relpath",
"relpath",
"ancestor_manifest",
]
for key in sorted(test.keys()):
if key in reserved:
continue
if key in global_kwargs:
continue
if key in global_tags and not test[key]:
continue
print("%s = %s" % (key, test[key]), file=fp)
print(file=fp)
if close:
# close the created file
fp.close()
def __str__(self):
fp = StringIO()
self.write(fp=fp)
value = fp.getvalue()
return value
def copy(self, directory, rootdir=None, *tags, **kwargs):
"""
copy the manifests and associated tests
- directory : directory to copy to
- rootdir : root directory to copy to (if not given from manifests)
- tags : keywords the tests must have
- kwargs : key, values the tests must match
"""
# XXX note that copy does *not* filter the tests out of the
# resulting manifest; it just stupidly copies them over.
# ideally, it would reread the manifests and filter out the
# tests that don't match *tags and **kwargs
# destination
if not os.path.exists(directory):
os.path.makedirs(directory)
else:
# sanity check
assert os.path.isdir(directory)
# tests to copy
tests = self.get(tags=tags, **kwargs)
if not tests:
return # nothing to do!
# root directory
if rootdir is None:
rootdir = self.rootdir
# copy the manifests + tests
manifests = [relpath(manifest, rootdir) for manifest in self.manifests()]
for manifest in manifests:
destination = os.path.join(directory, manifest)
dirname = os.path.dirname(destination)
if not os.path.exists(dirname):
os.makedirs(dirname)
else:
# sanity check
assert os.path.isdir(dirname)
shutil.copy(os.path.join(rootdir, manifest), destination)
missing = self.check_missing(tests)
tests = [test for test in tests if test not in missing]
for test in tests:
if os.path.isabs(test["name"]):
continue
source = test["path"]
destination = os.path.join(directory, relpath(test["path"], rootdir))
shutil.copy(source, destination)
# TODO: ensure that all of the tests are below the from_dir
def update(self, from_dir, rootdir=None, *tags, **kwargs):
"""
update the tests as listed in a manifest from a directory
- from_dir : directory where the tests live
- rootdir : root directory to copy to (if not given from manifests)
- tags : keys the tests must have
- kwargs : key, values the tests must match
"""
# get the tests
tests = self.get(tags=tags, **kwargs)
# get the root directory
if not rootdir:
rootdir = self.rootdir
# copy them!
for test in tests:
if not os.path.isabs(test["name"]):
_relpath = relpath(test["path"], rootdir)
source = os.path.join(from_dir, _relpath)
if not os.path.exists(source):
message = "Missing test: '%s' does not exist!"
if self.strict:
raise IOError(message)
print(message + " Skipping.", file=sys.stderr)
continue
destination = os.path.join(rootdir, _relpath)
shutil.copy(source, destination)
# directory importers
@classmethod
def _walk_directories(cls, directories, callback, pattern=None, ignore=()):
"""
internal function to import directories
"""
if isinstance(pattern, str):
patterns = [pattern]
else:
patterns = pattern
ignore = set(ignore)
if not patterns:
def accept_filename(filename):
return True
else:
def accept_filename(filename):
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern):
return True
if not ignore:
def accept_dirname(dirname):
return True
else:
def accept_dirname(dirname):
return dirname not in ignore
rootdirectories = directories[:]
seen_directories = set()
for rootdirectory in rootdirectories:
# let's recurse directories using list
directories = [os.path.realpath(rootdirectory)]
while directories:
directory = directories.pop(0)
if directory in seen_directories:
# eliminate possible infinite recursion due to
# symbolic links
continue
seen_directories.add(directory)
files = []
subdirs = []
for name in sorted(os.listdir(directory)):
path = os.path.join(directory, name)
if os.path.isfile(path):
# os.path.isfile follow symbolic links, we don't
# need to handle them here.
if accept_filename(name):
files.append(name)
continue
elif os.path.islink(path):
# eliminate symbolic links
path = os.path.realpath(path)
# we must have a directory here
if accept_dirname(name):
subdirs.append(name)
# this subdir is added for recursion
directories.insert(0, path)
# here we got all subdirs and files filtered, we can
# call the callback function if directory is not empty
if subdirs or files:
callback(rootdirectory, directory, subdirs, files)
@classmethod
def populate_directory_manifests(
cls, directories, filename, pattern=None, ignore=(), overwrite=False
):
"""
walks directories and writes manifests of name `filename` in-place;
returns `cls` instance populated with the given manifests
filename -- filename of manifests to write
pattern -- shell pattern (glob) or patterns of filenames to match
ignore -- directory names to ignore
overwrite -- whether to overwrite existing files of given name
"""
manifest_dict = {}
if os.path.basename(filename) != filename:
raise IOError("filename should not include directory name")
# no need to hit directories more than once
_directories = directories
directories = []
for directory in _directories:
if directory not in directories:
directories.append(directory)
def callback(directory, dirpath, dirnames, filenames):
"""write a manifest for each directory"""
manifest_path = os.path.join(dirpath, filename)
if (dirnames or filenames) and not (
os.path.exists(manifest_path) and overwrite
):
with open(manifest_path, "w") as manifest:
for dirname in dirnames:
print(
"[include:%s]" % os.path.join(dirname, filename),
file=manifest,
)
for _filename in filenames:
print("[%s]" % _filename, file=manifest)
# add to list of manifests
manifest_dict.setdefault(directory, manifest_path)
# walk the directories to gather files
cls._walk_directories(directories, callback, pattern=pattern, ignore=ignore)
# get manifests
manifests = [manifest_dict[directory] for directory in _directories]
# create a `cls` instance with the manifests
return cls(manifests=manifests)
@classmethod
def from_directories(
cls, directories, pattern=None, ignore=(), write=None, relative_to=None
):
"""
convert directories to a simple manifest; returns ManifestParser instance
pattern -- shell pattern (glob) or patterns of filenames to match
ignore -- directory names to ignore
write -- filename or file-like object of manifests to write;
if `None` then a StringIO instance will be created
relative_to -- write paths relative to this path;
if false then the paths are absolute
"""
# determine output
opened_manifest_file = None # name of opened manifest file
absolute = not relative_to # whether to output absolute path names as names
if isinstance(write, str):
opened_manifest_file = write
write = open(write, "w")
if write is None:
write = StringIO()
# walk the directories, generating manifests
def callback(directory, dirpath, dirnames, filenames):
# absolute paths
filenames = [os.path.join(dirpath, filename) for filename in filenames]
# ensure new manifest isn't added
filenames = [
filename for filename in filenames if filename != opened_manifest_file
]
# normalize paths
if not absolute and relative_to:
filenames = [relpath(filename, relative_to) for filename in filenames]
# write to manifest
write_content = "\n".join(
["[{}]".format(denormalize_path(filename)) for filename in filenames]
)
print(write_content, file=write)
cls._walk_directories(directories, callback, pattern=pattern, ignore=ignore)
if opened_manifest_file:
# close file
write.close()
manifests = [opened_manifest_file]
else:
# manifests/write is a file-like object;
# rewind buffer
write.flush()
write.seek(0)
manifests = [write]
# make a ManifestParser instance
return cls(manifests=manifests)
convert = ManifestParser.from_directories
class TestManifest(ManifestParser):
"""
apply logic to manifests; this is your integration layer :)
specific harnesses may subclass from this if they need more logic
"""
def __init__(self, *args, **kwargs):
ManifestParser.__init__(self, *args, **kwargs)
self.filters = filterlist(DEFAULT_FILTERS)
self.last_used_filters = []
def active_tests(
self, exists=True, disabled=True, filters=None, noDefaultFilters=False, **values
):
"""
Run all applied filters on the set of tests.
:param exists: filter out non-existing tests (default True)
:param disabled: whether to return disabled tests (default True)
:param values: keys and values to filter on (e.g. `os = linux mac`)
:param filters: list of filters to apply to the tests
:returns: list of test objects that were not filtered out
"""
tests = [i.copy() for i in self.tests] # shallow copy
# mark all tests as passing
for test in tests:
test["expected"] = test.get("expected", "pass")
# make a copy so original doesn't get modified
if noDefaultFilters:
fltrs = []
else:
fltrs = self.filters[:]
if exists:
if self.strict:
self.check_missing(tests)
else:
fltrs.append(_exists)
if not disabled:
fltrs.append(enabled)
if filters:
fltrs += filters
self.last_used_filters = fltrs[:]
for fn in fltrs:
tests = fn(tests, values)
return list(tests)
def test_paths(self):
return [test["path"] for test in self.active_tests()]
def fmt_filters(self, filters=None):
filters = filters or self.last_used_filters
names = []
for f in filters:
if isinstance(f, types.FunctionType):
names.append(f.__name__)
else:
names.append(str(f))
return ", ".join(names)