##// END OF EJS Templates
move walkrepos from util to scmutil
move walkrepos from util to scmutil

File last commit:

r13975:938fbeac default
r13975:938fbeac default
Show More
scmutil.py
297 lines | 10.7 KiB | text/x-python | PythonLexer
# scmutil.py - Mercurial core utility functions
#
# Copyright Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from i18n import _
import util, error
import os, errno, stat
def checkfilename(f):
'''Check that the filename f is an acceptable filename for a tracked file'''
if '\r' in f or '\n' in f:
raise util.Abort(_("'\\n' and '\\r' disallowed in filenames: %r") % f)
def checkportable(ui, f):
'''Check if filename f is portable and warn or abort depending on config'''
checkfilename(f)
val = ui.config('ui', 'portablefilenames', 'warn')
lval = val.lower()
abort = os.name == 'nt' or lval == 'abort'
bval = util.parsebool(val)
if abort or lval == 'warn' or bval:
msg = util.checkwinfilename(f)
if msg:
if abort:
raise util.Abort("%s: %r" % (msg, f))
ui.warn(_("warning: %s: %r\n") % (msg, f))
elif bval is None and lval != 'ignore':
raise error.ConfigError(
_("ui.portablefilenames value is invalid ('%s')") % val)
class path_auditor(object):
'''ensure that a filesystem path contains no banned components.
the following properties of a path are checked:
- ends with a directory separator
- under top-level .hg
- starts at the root of a windows drive
- contains ".."
- traverses a symlink (e.g. a/symlink_here/b)
- inside a nested repository (a callback can be used to approve
some nested repositories, e.g., subrepositories)
'''
def __init__(self, root, callback=None):
self.audited = set()
self.auditeddir = set()
self.root = root
self.callback = callback
def __call__(self, path):
'''Check the relative path.
path may contain a pattern (e.g. foodir/**.txt)'''
if path in self.audited:
return
# AIX ignores "/" at end of path, others raise EISDIR.
if util.endswithsep(path):
raise util.Abort(_("path ends in directory separator: %s") % path)
normpath = os.path.normcase(path)
parts = util.splitpath(normpath)
if (os.path.splitdrive(path)[0]
or parts[0].lower() in ('.hg', '.hg.', '')
or os.pardir in parts):
raise util.Abort(_("path contains illegal component: %s") % path)
if '.hg' in path.lower():
lparts = [p.lower() for p in parts]
for p in '.hg', '.hg.':
if p in lparts[1:]:
pos = lparts.index(p)
base = os.path.join(*parts[:pos])
raise util.Abort(_('path %r is inside nested repo %r')
% (path, base))
parts.pop()
prefixes = []
while parts:
prefix = os.sep.join(parts)
if prefix in self.auditeddir:
break
curpath = os.path.join(self.root, prefix)
try:
st = os.lstat(curpath)
except OSError, err:
# EINVAL can be raised as invalid path syntax under win32.
# They must be ignored for patterns can be checked too.
if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
raise
else:
if stat.S_ISLNK(st.st_mode):
raise util.Abort(
_('path %r traverses symbolic link %r')
% (path, prefix))
elif (stat.S_ISDIR(st.st_mode) and
os.path.isdir(os.path.join(curpath, '.hg'))):
if not self.callback or not self.callback(curpath):
raise util.Abort(_('path %r is inside nested repo %r') %
(path, prefix))
prefixes.append(prefix)
parts.pop()
self.audited.add(path)
# only add prefixes to the cache after checking everything: we don't
# want to add "foo/bar/baz" before checking if there's a "foo/.hg"
self.auditeddir.update(prefixes)
class opener(object):
'''Open files relative to a base directory
This class is used to hide the details of COW semantics and
remote file access from higher level code.
'''
def __init__(self, base, audit=True):
self.base = base
if audit:
self.auditor = path_auditor(base)
else:
self.auditor = util.always
self.createmode = None
self._trustnlink = None
@util.propertycache
def _can_symlink(self):
return util.checklink(self.base)
def _fixfilemode(self, name):
if self.createmode is None:
return
os.chmod(name, self.createmode & 0666)
def __call__(self, path, mode="r", text=False, atomictemp=False):
r = util.checkosfilename(path)
if r:
raise util.Abort("%s: %r" % (r, path))
self.auditor(path)
f = os.path.join(self.base, path)
if not text and "b" not in mode:
mode += "b" # for that other OS
nlink = -1
dirname, basename = os.path.split(f)
# If basename is empty, then the path is malformed because it points
# to a directory. Let the posixfile() call below raise IOError.
if basename and mode not in ('r', 'rb'):
if atomictemp:
if not os.path.isdir(dirname):
util.makedirs(dirname, self.createmode)
return util.atomictempfile(f, mode, self.createmode)
try:
if 'w' in mode:
util.unlink(f)
nlink = 0
else:
# nlinks() may behave differently for files on Windows
# shares if the file is open.
fd = util.posixfile(f)
nlink = util.nlinks(f)
if nlink < 1:
nlink = 2 # force mktempcopy (issue1922)
fd.close()
except (OSError, IOError), e:
if e.errno != errno.ENOENT:
raise
nlink = 0
if not os.path.isdir(dirname):
util.makedirs(dirname, self.createmode)
if nlink > 0:
if self._trustnlink is None:
self._trustnlink = nlink > 1 or util.checknlink(f)
if nlink > 1 or not self._trustnlink:
util.rename(util.mktempcopy(f), f)
fp = util.posixfile(f, mode)
if nlink == 0:
self._fixfilemode(f)
return fp
def symlink(self, src, dst):
self.auditor(dst)
linkname = os.path.join(self.base, dst)
try:
os.unlink(linkname)
except OSError:
pass
dirname = os.path.dirname(linkname)
if not os.path.exists(dirname):
util.makedirs(dirname, self.createmode)
if self._can_symlink:
try:
os.symlink(src, linkname)
except OSError, err:
raise OSError(err.errno, _('could not symlink to %r: %s') %
(src, err.strerror), linkname)
else:
f = self(dst, "w")
f.write(src)
f.close()
self._fixfilemode(dst)
def canonpath(root, cwd, myname, auditor=None):
'''return the canonical path of myname, given cwd and root'''
if util.endswithsep(root):
rootsep = root
else:
rootsep = root + os.sep
name = myname
if not os.path.isabs(name):
name = os.path.join(root, cwd, name)
name = os.path.normpath(name)
if auditor is None:
auditor = path_auditor(root)
if name != rootsep and name.startswith(rootsep):
name = name[len(rootsep):]
auditor(name)
return util.pconvert(name)
elif name == root:
return ''
else:
# Determine whether `name' is in the hierarchy at or beneath `root',
# by iterating name=dirname(name) until that causes no change (can't
# check name == '/', because that doesn't work on windows). For each
# `name', compare dev/inode numbers. If they match, the list `rel'
# holds the reversed list of components making up the relative file
# name we want.
root_st = os.stat(root)
rel = []
while True:
try:
name_st = os.stat(name)
except OSError:
break
if util.samestat(name_st, root_st):
if not rel:
# name was actually the same as root (maybe a symlink)
return ''
rel.reverse()
name = os.path.join(*rel)
auditor(name)
return util.pconvert(name)
dirname, basename = os.path.split(name)
rel.append(basename)
if dirname == name:
break
name = dirname
raise util.Abort('%s not under root' % myname)
def walkrepos(path, followsym=False, seen_dirs=None, recurse=False):
'''yield every hg repository under path, recursively.'''
def errhandler(err):
if err.filename == path:
raise err
if followsym and hasattr(os.path, 'samestat'):
def _add_dir_if_not_there(dirlst, dirname):
match = False
samestat = os.path.samestat
dirstat = os.stat(dirname)
for lstdirstat in dirlst:
if samestat(dirstat, lstdirstat):
match = True
break
if not match:
dirlst.append(dirstat)
return not match
else:
followsym = False
if (seen_dirs is None) and followsym:
seen_dirs = []
_add_dir_if_not_there(seen_dirs, path)
for root, dirs, files in os.walk(path, topdown=True, onerror=errhandler):
dirs.sort()
if '.hg' in dirs:
yield root # found a repository
qroot = os.path.join(root, '.hg', 'patches')
if os.path.isdir(os.path.join(qroot, '.hg')):
yield qroot # we have a patch queue repo here
if recurse:
# avoid recursing inside the .hg directory
dirs.remove('.hg')
else:
dirs[:] = [] # don't descend further
elif followsym:
newdirs = []
for d in dirs:
fname = os.path.join(root, d)
if _add_dir_if_not_there(seen_dirs, fname):
if os.path.islink(fname):
for hgname in walkrepos(fname, True, seen_dirs):
yield hgname
else:
newdirs.append(d)
dirs[:] = newdirs