Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
"Mozilla l10n compare locales tool"
import codecs
import os
import shutil
import re
from compare_locales import parser
from compare_locales import mozpath
from compare_locales.checks import getChecker, EntityPos
from compare_locales.keyedtuple import KeyedTuple
from .observer import ObserverList
from .utils import AddRemove
class ContentComparer:
keyRE = re.compile("[kK]ey")
nl = re.compile("\n", re.M)
def __init__(self, quiet=0):
"""Create a ContentComparer.
observer is usually a instance of Observer. The return values
of the notify method are used to control the handling of missing
entities.
"""
self.observers = ObserverList(quiet=quiet)
def create_merge_dir(self, merge_file):
outdir = mozpath.dirname(merge_file)
os.makedirs(outdir, exist_ok=True)
def merge(
self,
ref_entities,
ref_file,
l10n_file,
merge_file,
missing,
skips,
ctx,
capabilities,
encoding,
):
"""Create localized file in merge dir
`ref_entities` and `ref_map` are the parser result of the
reference file
`ref_file` and `l10n_file` are the File objects for the reference and
the l10n file, resp.
`merge_file` is the output path for the generated content. This is None
if we're just comparing or validating.
`missing` are the missing messages in l10n - potentially copied from
reference
`skips` are entries to be dropped from the localized file
`ctx` is the parsing context
`capabilities` are the capabilities for the merge algorithm
`encoding` is the encoding to be used when serializing, usually utf-8
"""
if not merge_file:
return
if capabilities == parser.CAN_NONE:
return
self.create_merge_dir(merge_file)
if capabilities & parser.CAN_COPY:
# copy the l10n file if it's good, or the reference file if not
if skips or missing:
src = ref_file.fullpath
else:
src = l10n_file.fullpath
shutil.copyfile(src, merge_file)
print("copied reference to " + merge_file)
return
if not (capabilities & parser.CAN_SKIP):
return
# Start with None in case the merge file doesn't need to be created.
f = None
if skips:
# skips come in ordered by key name, we need them in file order
skips.sort(key=lambda s: s.span[0])
# we need to skip a few erroneous blocks in the input, copy by hand
f = codecs.open(merge_file, "wb", encoding)
offset = 0
for skip in skips:
chunk = skip.span
f.write(ctx.contents[offset : chunk[0]])
offset = chunk[1]
f.write(ctx.contents[offset:])
if f is None:
# l10n file is a good starting point
shutil.copyfile(l10n_file.fullpath, merge_file)
if not (capabilities & parser.CAN_MERGE):
if f:
f.close()
return
if skips or missing:
if f is None:
f = codecs.open(merge_file, "ab", encoding)
trailing = (
["\n"]
+ [ref_entities[key].all for key in missing]
+ [
ref_entities[skip.key].all
for skip in skips
if not isinstance(skip, parser.Junk)
]
)
def ensureNewline(s):
if not s.endswith("\n"):
return s + "\n"
return s
print("adding to " + merge_file)
f.write("".join(map(ensureNewline, trailing)))
if f is not None:
f.close()
def remove(self, ref_file, l10n, merge_file):
"""Obsolete l10n file.
Copy to merge stage if we can.
"""
self.observers.notify("obsoleteFile", l10n, None)
self.merge(
KeyedTuple([]),
ref_file,
l10n,
merge_file,
[],
[],
None,
parser.CAN_COPY,
None,
)
def compare(self, ref_file, l10n, merge_file, extra_tests=None):
try:
p = parser.getParser(ref_file.file)
except UserWarning:
# no comparison, XXX report?
# At least, merge
self.merge(
KeyedTuple([]),
ref_file,
l10n,
merge_file,
[],
[],
None,
parser.CAN_COPY,
None,
)
return
try:
p.readFile(ref_file)
except Exception as e:
self.observers.notify("error", ref_file, str(e))
return
ref_entities = p.parse()
try:
p.readFile(l10n)
l10n_entities = p.parse()
l10n_ctx = p.ctx
except Exception as e:
self.observers.notify("error", l10n, str(e))
return
ar = AddRemove()
ar.set_left(ref_entities.keys())
ar.set_right(l10n_entities.keys())
report = missing = obsolete = changed = unchanged = keys = 0
missing_w = changed_w = unchanged_w = 0 # word stats
missings = []
skips = []
checker = getChecker(l10n, extra_tests=extra_tests)
if checker and checker.needs_reference:
checker.set_reference(ref_entities)
for msg in p.findDuplicates(ref_entities):
self.observers.notify("warning", l10n, msg)
for msg in p.findDuplicates(l10n_entities):
self.observers.notify("error", l10n, msg)
for action, entity_id in ar:
if action == "delete":
# missing entity
if isinstance(ref_entities[entity_id], parser.Junk):
self.observers.notify("warning", l10n, "Parser error in en-US")
continue
_rv = self.observers.notify("missingEntity", l10n, entity_id)
if _rv == "ignore":
continue
if _rv == "error":
# only add to missing entities for l10n-merge on error,
# not report
missings.append(entity_id)
missing += 1
refent = ref_entities[entity_id]
missing_w += refent.count_words()
else:
# just report
report += 1
elif action == "add":
# obsolete entity or junk
if isinstance(l10n_entities[entity_id], parser.Junk):
junk = l10n_entities[entity_id]
self.observers.notify("error", l10n, junk.error_message())
if merge_file is not None:
skips.append(junk)
elif (
self.observers.notify("obsoleteEntity", l10n, entity_id) != "ignore"
):
obsolete += 1
else:
# entity found in both ref and l10n, check for changed
refent = ref_entities[entity_id]
l10nent = l10n_entities[entity_id]
if self.keyRE.search(entity_id):
keys += 1
else:
if refent.equals(l10nent):
self.doUnchanged(l10nent)
unchanged += 1
unchanged_w += refent.count_words()
else:
self.doChanged(ref_file, refent, l10nent)
changed += 1
changed_w += refent.count_words()
# run checks:
if checker:
for tp, pos, msg, cat in checker.check(refent, l10nent):
if isinstance(pos, EntityPos):
line, col = l10nent.position(pos)
else:
line, col = l10nent.value_position(pos)
# skip error entities when merging
if tp == "error" and merge_file is not None:
skips.append(l10nent)
self.observers.notify(
tp,
l10n,
"%s at line %d, column %d for %s"
% (msg, line, col, refent.key),
)
pass
if merge_file is not None:
self.merge(
ref_entities,
ref_file,
l10n,
merge_file,
missings,
skips,
l10n_ctx,
p.capabilities,
p.encoding,
)
stats = {
"missing": missing,
"missing_w": missing_w,
"report": report,
"obsolete": obsolete,
"changed": changed,
"changed_w": changed_w,
"unchanged": unchanged,
"unchanged_w": unchanged_w,
"keys": keys,
}
self.observers.updateStats(l10n, stats)
pass
def add(self, orig, missing, merge_file):
"""Add missing localized file."""
f = orig
try:
p = parser.getParser(f.file)
except UserWarning:
p = None
# if we don't support this file, assume CAN_COPY to mimick
# l10n dir as closely as possible
caps = p.capabilities if p else parser.CAN_COPY
if caps & (parser.CAN_COPY | parser.CAN_MERGE):
# even if we can merge, pretend we can only copy
self.merge(
KeyedTuple([]),
orig,
missing,
merge_file,
["trigger copy"],
[],
None,
parser.CAN_COPY,
None,
)
if self.observers.notify("missingFile", missing, None) == "ignore":
# filter said that we don't need this file, don't count it
return
if p is None:
# We don't have a parser, cannot count missing strings
return
try:
p.readFile(f)
entities = p.parse()
except Exception as ex:
self.observers.notify("error", f, str(ex))
return
# strip parse errors
entities = [e for e in entities if not isinstance(e, parser.Junk)]
self.observers.updateStats(missing, {"missing": len(entities)})
missing_w = 0
for e in entities:
missing_w += e.count_words()
self.observers.updateStats(missing, {"missing_w": missing_w})
def doUnchanged(self, entity):
# overload this if needed
pass
def doChanged(self, file, ref_entity, l10n_entity):
# overload this if needed
pass