# fileset.py - file set queries for mercurial # # Copyright 2010 Matt Mackall # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import errno import re from .i18n import _ from . import ( error, filesetlang, match as matchmod, merge, pycompat, registrar, scmutil, util, ) from .utils import ( stringutil, ) # common weight constants _WEIGHT_CHECK_FILENAME = filesetlang.WEIGHT_CHECK_FILENAME _WEIGHT_READ_CONTENTS = filesetlang.WEIGHT_READ_CONTENTS _WEIGHT_STATUS = filesetlang.WEIGHT_STATUS _WEIGHT_STATUS_THOROUGH = filesetlang.WEIGHT_STATUS_THOROUGH # helpers for processing parsed tree getsymbol = filesetlang.getsymbol getstring = filesetlang.getstring _getkindpat = filesetlang.getkindpat getpattern = filesetlang.getpattern getargs = filesetlang.getargs def getmatch(mctx, x): if not x: raise error.ParseError(_("missing argument")) return methods[x[0]](mctx, *x[1:]) def stringmatch(mctx, x): return mctx.matcher([x]) def kindpatmatch(mctx, x, y): return stringmatch(mctx, _getkindpat(x, y, matchmod.allpatternkinds, _("pattern must be a string"))) def patternsmatch(mctx, *xs): allkinds = matchmod.allpatternkinds patterns = [getpattern(x, allkinds, _("pattern must be a string")) for x in xs] return mctx.matcher(patterns) def andmatch(mctx, x, y): xm = getmatch(mctx, x) ym = getmatch(mctx, y) return matchmod.intersectmatchers(xm, ym) def ormatch(mctx, *xs): ms = [getmatch(mctx, x) for x in xs] return matchmod.unionmatcher(ms) def notmatch(mctx, x): m = getmatch(mctx, x) return mctx.predicate(lambda f: not m(f), predrepr=('', m)) def minusmatch(mctx, x, y): xm = getmatch(mctx, x) ym = getmatch(mctx, y) return matchmod.differencematcher(xm, ym) def listmatch(mctx, *xs): raise error.ParseError(_("can't use a list in this context"), hint=_('see \'hg help "filesets.x or y"\'')) def func(mctx, a, b): funcname = getsymbol(a) if funcname in symbols: return symbols[funcname](mctx, b) keep = lambda fn: getattr(fn, '__doc__', None) is not None syms = [s for (s, fn) in symbols.items() if keep(fn)] raise error.UnknownIdentifier(funcname, syms) # symbols are callable like: # fun(mctx, x) # with: # mctx - current matchctx instance # x - argument in tree form symbols = filesetlang.symbols # filesets using matchctx.status() _statuscallers = set() predicate = registrar.filesetpredicate() @predicate('modified()', callstatus=True, weight=_WEIGHT_STATUS) def modified(mctx, x): """File that is modified according to :hg:`status`. """ # i18n: "modified" is a keyword getargs(x, 0, 0, _("modified takes no arguments")) s = set(mctx.status().modified) return mctx.predicate(s.__contains__, predrepr='modified') @predicate('added()', callstatus=True, weight=_WEIGHT_STATUS) def added(mctx, x): """File that is added according to :hg:`status`. """ # i18n: "added" is a keyword getargs(x, 0, 0, _("added takes no arguments")) s = set(mctx.status().added) return mctx.predicate(s.__contains__, predrepr='added') @predicate('removed()', callstatus=True, weight=_WEIGHT_STATUS) def removed(mctx, x): """File that is removed according to :hg:`status`. """ # i18n: "removed" is a keyword getargs(x, 0, 0, _("removed takes no arguments")) s = set(mctx.status().removed) return mctx.predicate(s.__contains__, predrepr='removed') @predicate('deleted()', callstatus=True, weight=_WEIGHT_STATUS) def deleted(mctx, x): """Alias for ``missing()``. """ # i18n: "deleted" is a keyword getargs(x, 0, 0, _("deleted takes no arguments")) s = set(mctx.status().deleted) return mctx.predicate(s.__contains__, predrepr='deleted') @predicate('missing()', callstatus=True, weight=_WEIGHT_STATUS) def missing(mctx, x): """File that is missing according to :hg:`status`. """ # i18n: "missing" is a keyword getargs(x, 0, 0, _("missing takes no arguments")) s = set(mctx.status().deleted) return mctx.predicate(s.__contains__, predrepr='deleted') @predicate('unknown()', callstatus=True, weight=_WEIGHT_STATUS_THOROUGH) def unknown(mctx, x): """File that is unknown according to :hg:`status`.""" # i18n: "unknown" is a keyword getargs(x, 0, 0, _("unknown takes no arguments")) s = set(mctx.status().unknown) return mctx.predicate(s.__contains__, predrepr='unknown') @predicate('ignored()', callstatus=True, weight=_WEIGHT_STATUS_THOROUGH) def ignored(mctx, x): """File that is ignored according to :hg:`status`.""" # i18n: "ignored" is a keyword getargs(x, 0, 0, _("ignored takes no arguments")) s = set(mctx.status().ignored) return mctx.predicate(s.__contains__, predrepr='ignored') @predicate('clean()', callstatus=True, weight=_WEIGHT_STATUS) def clean(mctx, x): """File that is clean according to :hg:`status`. """ # i18n: "clean" is a keyword getargs(x, 0, 0, _("clean takes no arguments")) s = set(mctx.status().clean) return mctx.predicate(s.__contains__, predrepr='clean') @predicate('tracked()') def tracked(mctx, x): """File that is under Mercurial control.""" # i18n: "tracked" is a keyword getargs(x, 0, 0, _("tracked takes no arguments")) return mctx.predicate(mctx.ctx.__contains__, predrepr='tracked') @predicate('binary()', weight=_WEIGHT_READ_CONTENTS) def binary(mctx, x): """File that appears to be binary (contains NUL bytes). """ # i18n: "binary" is a keyword getargs(x, 0, 0, _("binary takes no arguments")) return mctx.fpredicate(lambda fctx: fctx.isbinary(), predrepr='binary', cache=True) @predicate('exec()') def exec_(mctx, x): """File that is marked as executable. """ # i18n: "exec" is a keyword getargs(x, 0, 0, _("exec takes no arguments")) ctx = mctx.ctx return mctx.predicate(lambda f: ctx.flags(f) == 'x', predrepr='exec') @predicate('symlink()') def symlink(mctx, x): """File that is marked as a symlink. """ # i18n: "symlink" is a keyword getargs(x, 0, 0, _("symlink takes no arguments")) ctx = mctx.ctx return mctx.predicate(lambda f: ctx.flags(f) == 'l', predrepr='symlink') @predicate('resolved()', weight=_WEIGHT_STATUS) def resolved(mctx, x): """File that is marked resolved according to :hg:`resolve -l`. """ # i18n: "resolved" is a keyword getargs(x, 0, 0, _("resolved takes no arguments")) if mctx.ctx.rev() is not None: return mctx.never() ms = merge.mergestate.read(mctx.ctx.repo()) return mctx.predicate(lambda f: f in ms and ms[f] == 'r', predrepr='resolved') @predicate('unresolved()', weight=_WEIGHT_STATUS) def unresolved(mctx, x): """File that is marked unresolved according to :hg:`resolve -l`. """ # i18n: "unresolved" is a keyword getargs(x, 0, 0, _("unresolved takes no arguments")) if mctx.ctx.rev() is not None: return mctx.never() ms = merge.mergestate.read(mctx.ctx.repo()) return mctx.predicate(lambda f: f in ms and ms[f] == 'u', predrepr='unresolved') @predicate('hgignore()', weight=_WEIGHT_STATUS) def hgignore(mctx, x): """File that matches the active .hgignore pattern. """ # i18n: "hgignore" is a keyword getargs(x, 0, 0, _("hgignore takes no arguments")) return mctx.ctx.repo().dirstate._ignore @predicate('portable()', weight=_WEIGHT_CHECK_FILENAME) def portable(mctx, x): """File that has a portable name. (This doesn't include filenames with case collisions.) """ # i18n: "portable" is a keyword getargs(x, 0, 0, _("portable takes no arguments")) return mctx.predicate(lambda f: util.checkwinfilename(f) is None, predrepr='portable') @predicate('grep(regex)', weight=_WEIGHT_READ_CONTENTS) def grep(mctx, x): """File contains the given regular expression. """ try: # i18n: "grep" is a keyword r = re.compile(getstring(x, _("grep requires a pattern"))) except re.error as e: raise error.ParseError(_('invalid match pattern: %s') % stringutil.forcebytestr(e)) return mctx.fpredicate(lambda fctx: r.search(fctx.data()), predrepr=('grep(%r)', r.pattern), cache=True) def _sizetomax(s): try: s = s.strip().lower() for k, v in util._sizeunits: if s.endswith(k): # max(4k) = 5k - 1, max(4.5k) = 4.6k - 1 n = s[:-len(k)] inc = 1.0 if "." in n: inc /= 10 ** len(n.split(".")[1]) return int((float(n) + inc) * v) - 1 # no extension, this is a precise value return int(s) except ValueError: raise error.ParseError(_("couldn't parse size: %s") % s) def sizematcher(expr): """Return a function(size) -> bool from the ``size()`` expression""" expr = expr.strip() if '-' in expr: # do we have a range? a, b = expr.split('-', 1) a = util.sizetoint(a) b = util.sizetoint(b) return lambda x: x >= a and x <= b elif expr.startswith("<="): a = util.sizetoint(expr[2:]) return lambda x: x <= a elif expr.startswith("<"): a = util.sizetoint(expr[1:]) return lambda x: x < a elif expr.startswith(">="): a = util.sizetoint(expr[2:]) return lambda x: x >= a elif expr.startswith(">"): a = util.sizetoint(expr[1:]) return lambda x: x > a else: a = util.sizetoint(expr) b = _sizetomax(expr) return lambda x: x >= a and x <= b @predicate('size(expression)', weight=_WEIGHT_STATUS) def size(mctx, x): """File size matches the given expression. Examples: - size('1k') - files from 1024 to 2047 bytes - size('< 20k') - files less than 20480 bytes - size('>= .5MB') - files at least 524288 bytes - size('4k - 1MB') - files from 4096 bytes to 1048576 bytes """ # i18n: "size" is a keyword expr = getstring(x, _("size requires an expression")) m = sizematcher(expr) return mctx.fpredicate(lambda fctx: m(fctx.size()), predrepr=('size(%r)', expr), cache=True) @predicate('encoding(name)', weight=_WEIGHT_READ_CONTENTS) def encoding(mctx, x): """File can be successfully decoded with the given character encoding. May not be useful for encodings other than ASCII and UTF-8. """ # i18n: "encoding" is a keyword enc = getstring(x, _("encoding requires an encoding name")) def encp(fctx): d = fctx.data() try: d.decode(pycompat.sysstr(enc)) return True except LookupError: raise error.Abort(_("unknown encoding '%s'") % enc) except UnicodeDecodeError: return False return mctx.fpredicate(encp, predrepr=('encoding(%r)', enc), cache=True) @predicate('eol(style)', weight=_WEIGHT_READ_CONTENTS) def eol(mctx, x): """File contains newlines of the given style (dos, unix, mac). Binary files are excluded, files with mixed line endings match multiple styles. """ # i18n: "eol" is a keyword enc = getstring(x, _("eol requires a style name")) def eolp(fctx): if fctx.isbinary(): return False d = fctx.data() if (enc == 'dos' or enc == 'win') and '\r\n' in d: return True elif enc == 'unix' and re.search('(?