Revision control
Copy as Markdown
from __future__ import absolute_import
from __future__ import unicode_literals
from os import getcwd
from sys import stdout
from re import escape as re_escape
from uuid import uuid1
from locale import getpreferredencoding
from codecs import getwriter
try:
    from hashlib import md5
except ImportError:
    import md5
from os import path
from termcolor import colored
__all__ = ('Path', 'Writer', 'file_md5', 'format_to_re',)
def format_to_re(format):
    """Return the regular expression that matches all possible values
    the given Python 2 format string (using %(foo)s placeholders) can
    possibly resolve to.
    Each placeholder in the format string is captured in a named group.
    The difficult part here is inserting unescaped regular expression
    syntax in place of the format variables, while still properly
    escaping the rest.
    See this link for more info on the problem:
    """
    UNIQ = uuid1().hex
    assert UNIQ not in format
    class MarkPlaceholders(dict):
        def __getitem__(self, key):
            return UNIQ + ('(?P<%s>.*?)' % key) + UNIQ
    parts = (format % MarkPlaceholders()).split(UNIQ)
    for i in range(0, len(parts), 2):
        parts[i] = re_escape(parts[i])
    return ''.join(parts)
def file_md5(filename):
    """Generate the md5 hash of the given file.
    """
    h = md5()
    f = open(filename, 'rb')
    try:
        while True:
            # 128 is the md5 digest blocksize
            data = f.read(128*10)
            if not data:
                break
            h.update(data)
        return h.digest()
    finally:
        f.close()
class Path(str):
    """Helper representing a filesystem path that can be "bound" to a base
    path. You can then ask it to render as a relative path to that base.
    """
    def __new__(self, *parts, **kwargs):
        base = kwargs.pop('base', None)
        if kwargs:
            raise TypeError()
        self.base = base
        abs = path.normpath(path.abspath(path.join(*parts)))
        return str.__new__(self, abs)
    @property
    def rel(self):
        """Return this path relative to the base it was bound to.
        """
        base = self.base or getcwd()
        if not hasattr(path, 'relpath'):  # pragma: no cover
            # Python < 2.6 doesn't have relpath, and I don't want
            # to bother with a wbole bunch of code for this. See
            # if we can simply remove the prefix, and if not, 2.5
            # users will have to live with the absolute path.
            if self.path.startswith(base):
                return self.path[len(base)+1:]
            return self.abs
        return path.relpath(self, start=base)
    @property
    def abs(self):
        return self
    def exists(self):
        return path.exists(self)
    @property
    def dir(self):
        return Path(path.dirname(self), base=self.base)
    def hash(self):
        return file_md5(self)
class Writer():
    """Helps printing messages to the output, in a very particular form.
    Supported are two concepts, "actions" and "messages". A message is
    always the child of an action. There is a limited set of action
    types (we call them events). Each event and each message may have a
    "severity". The severity can determine how a message or event is
    rendered (if the terminals supports colors), and will also affect
    whether a action or message is rendered at all, depending on verbosity
    settings.
    If a message exceeds it's action in severity causing the message to
    be visible but the action not, the action will forcably be rendered as
    well. For this reason, the class keeps track of the last message that
    should have been printed.
    There is also a mechanism which allows to delay printing an action.
    That is, you may begin constructing an action and collecting it's
    messages, and only later print it out. You would want to do this if
    the event type can only be determined after the action is completed,
    since it often indicates the outcome.
    """
    # Action types and their default levels
    EVENTS = {
        'info': 'info',
        'mkdir': 'default',
        'updated': 'default',
        'unchanged': 'default',
        'skipped': 'warning',
        'created': 'default',
        'exists': 'default',
        'failed': 'error'
    }
    # Levels and the minimum verbosity required to show them
    LEVELS = {'default': 2, 'warning': 1, 'error': 0, 'info': 3}
    # +2 for [ and ]
    # +1 for additional left padding
    max_event_len = max([len(k) for k in list(EVENTS.keys())]) + 2 + 1
    class Action(dict):
        def __init__(self, writer, *more, **data):
            self.writer = writer
            self.messages = []
            self.is_done = False
            self.awaiting_promotion = False
            dict.__init__(self, {'text': '', 'status': None, 'severity': None})
            self.update(*more, **data)
        def __setitem__(self, name, value):
            if name == 'severity':
                assert value in Writer.LEVELS, 'Not a valid severity value'
            dict.__setitem__(self, name, value)
        def done(self, event, *more, **data):
            """Mark this action as done. This will cause it and it's
            current messages to be printed, provided they pass the
            verbosity threshold, of course.
            """
            assert event in Writer.EVENTS, 'Not a valid event type'
            self['event'] = event
            self.update(*more, **data)
            self.writer._print_action(self)
            if self in self.writer._pending_actions:
                self.writer._pending_actions.remove(self)
            self.is_done = True
            if self.severity == 'error':
                self.writer.erroneous = True
        def update(self, text=None, severity=None, **more_data):
            """Update the message with the given data.
            """
            if text:
                self['text'] = text
            if severity:
                self['severity'] = severity
            dict.update(self, **more_data)
        def message(self, message, severity='info'):
            """Print a message belonging to this action.
            If the action is not yet done, this will be added to
            an internal queue.
            If the action is done, but was not printed because it didn't
            pass the verbosity threshold, it will be printed now.
            By default, all messages use a loglevel of 'info'.
            """
            is_allowed = self.writer.allowed(severity)
            if severity == 'error':
                self.writer.erroneous = True
            if not self.is_done:
                if is_allowed:
                    self.messages.append((message, severity))
            elif is_allowed:
                if self.awaiting_promotion:
                    self.writer._print_action(self, force=True)
                self.writer._print_message(message, severity)
        @property
        def event(self):
            return self['event']
        @property
        def severity(self):
            sev = self['severity']
            if not sev:
                sev = Writer.EVENTS[self.event]
            return sev
    def __init__(self, verbosity=LEVELS['default']):
        self._current_action = None
        self._pending_actions = []
        self.verbosity = verbosity
        self.erroneous = False
        # Create a codec writer wrapping stdout
        self.stdout = getwriter(self.get_encoding())(stdout)
    @staticmethod
    def get_encoding():
        if hasattr(stdout, 'isatty') and stdout.isatty():
            return stdout.encoding
        return getpreferredencoding()
    def action(self, event, *a, **kw):
        action = Writer.Action(self, *a, **kw)
        action.done(event)
        return action
    def begin(self, *a, **kw):
        """Begin a new action, and return it. The action will not be
        printed until you call ``done()`` on it.
        In the meantime, you can attach message to it though, which will
        be printed together with the action once it is "done".
        """
        action = Writer.Action(self, *a, **kw)
        self._pending_actions.append(action)
        return action
    def message(self, *a, **kw):
        """Attach a message to the last action to be completed. This
        includes actions that have not yet been printed (due to not
        passing the threshold), but does not include actions that are
        not yet marked as 'done'.
        """
        self._current_action.message(*a, **kw)
    def finish(self):
        """Close down all pending actions that have been began(), but
        are not yet done.
        Not the sibling of begin()!
        """
        for action in self._pending_actions:
            if not action.is_done:
                action.done('failed')
        self._pending_actions = []
    def allowed(self, severity):
        """Return ``True`` if mesages with this severity pass
        the current verbosity threshold.
        """
        return self.verbosity >= self.LEVELS[severity]
    def _get_style_for_level(self, severity):
        """Return a dict that can be passed as **kwargs to colored().
        """
        # Other colors that work moderately well on both dark and
        # light backgrounds and aren't yet used: cyan, green
        return {
            'default': {'color': 'blue'},
            'info': {},
            'warning': {'color': 'magenta'},
            'error': {'color': 'red'},
        }.get(severity, {})
    def get_style_for_action(self, action):
        """First looks at the event type to determine a style, then
        falls back to severity for good measure.
        """
        try:
            return {
                'info': {},   # alyways render info in default
                'exists': {'color': 'blue'}
            }[action.event]
        except KeyError:
            return self._get_style_for_level(action.severity)
    def _print_action(self, action, force=False):
        """Print the action and all it's attached messages.
        """
        if force or self.allowed(action.severity) or action.messages:
            self._print_action_header(action)
            for m, severity in action.messages:
                self._print_message(m, severity)
            action.awaiting_promotion = False
        else:
            # Indicates that this message has not been printed yet,
            # and is waiting for a dependent message that needs to
            # be printed to trigger it.
            action.awaiting_promotion = True
        self._current_action = action
    def _print_action_header(self, action):
        text = action['text']
        status = action['status']
        if isinstance(text, Path):
            # Handle Path instances manually. This doesn't happen
            # automatically because we haven't figur out how to make
            # that class represent itself through the relative path
            # by default, while still returning the full path if it
            # is used, say, during an open() operation.
            text = text.rel
        if status:
            text = "%s (%s)" % (text, status)
        tag = "[%s]" % action['event']
        style = self.get_style_for_action(action)
        self.stdout.write(colored("%*s" % (self.max_event_len, tag), attrs=['bold'], **style))
        self.stdout.write(" ")
        self.stdout.write(colored(text, **style))
        self.stdout.write("\n")
    def _print_message(self, message, severity):
        style = self._get_style_for_level(severity)
        self.stdout.write(colored(" "*(self.max_event_len+1) + "- %s" % message,
                          **style))
        self.stdout.write("\n")