Show More
pathutil.py
214 lines
| 7.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / pathutil.py
Gregory Szorc
|
r25964 | from __future__ import absolute_import | ||
Augie Fackler
|
r20033 | |||
Gregory Szorc
|
r25964 | import errno | ||
import os | ||||
import posixpath | ||||
import stat | ||||
from .i18n import _ | ||||
from . import ( | ||||
encoding, | ||||
Pierre-Yves David
|
r26587 | error, | ||
Gregory Szorc
|
r25964 | util, | ||
) | ||||
Augie Fackler
|
r20033 | |||
Augie Fackler
|
r23598 | def _lowerclean(s): | ||
return encoding.hfsignoreclean(s.lower()) | ||||
Augie Fackler
|
r20033 | class pathauditor(object): | ||
'''ensure that a filesystem path contains no banned components. | ||||
the following properties of a path are checked: | ||||
- ends with a directory separator | ||||
- under top-level .hg | ||||
- starts at the root of a windows drive | ||||
- contains ".." | ||||
Pierre-Yves David
|
r27232 | |||
More check are also done about the file system states: | ||||
Augie Fackler
|
r20033 | - traverses a symlink (e.g. a/symlink_here/b) | ||
- inside a nested repository (a callback can be used to approve | ||||
some nested repositories, e.g., subrepositories) | ||||
Pierre-Yves David
|
r27232 | |||
The file system checks are only done when 'realfs' is set to True (the | ||||
default). They should be disable then we are auditing path for operation on | ||||
stored history. | ||||
Augie Fackler
|
r20033 | ''' | ||
Pierre-Yves David
|
r27232 | def __init__(self, root, callback=None, realfs=True): | ||
Augie Fackler
|
r20033 | self.audited = set() | ||
self.auditeddir = set() | ||||
self.root = root | ||||
Pierre-Yves David
|
r27232 | self._realfs = realfs | ||
Augie Fackler
|
r20033 | self.callback = callback | ||
if os.path.lexists(root) and not util.checkcase(root): | ||||
self.normcase = util.normcase | ||||
else: | ||||
self.normcase = lambda x: x | ||||
def __call__(self, path): | ||||
'''Check the relative path. | ||||
path may contain a pattern (e.g. foodir/**.txt)''' | ||||
path = util.localpath(path) | ||||
normpath = self.normcase(path) | ||||
if normpath in self.audited: | ||||
return | ||||
# AIX ignores "/" at end of path, others raise EISDIR. | ||||
if util.endswithsep(path): | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_("path ends in directory separator: %s") % path) | ||
Augie Fackler
|
r20033 | parts = util.splitpath(path) | ||
if (os.path.splitdrive(path)[0] | ||||
Augie Fackler
|
r23598 | or _lowerclean(parts[0]) in ('.hg', '.hg.', '') | ||
Augie Fackler
|
r20033 | or os.pardir in parts): | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_("path contains illegal component: %s") % path) | ||
Matt Mackall
|
r23599 | # Windows shortname aliases | ||
for p in parts: | ||||
if "~" in p: | ||||
first, last = p.split("~", 1) | ||||
if last.isdigit() and first.upper() in ["HG", "HG8B6C"]: | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_("path contains illegal component: %s") | ||
Matt Mackall
|
r23599 | % path) | ||
Augie Fackler
|
r23598 | if '.hg' in _lowerclean(path): | ||
lparts = [_lowerclean(p.lower()) for p in parts] | ||||
Augie Fackler
|
r20033 | for p in '.hg', '.hg.': | ||
if p in lparts[1:]: | ||||
pos = lparts.index(p) | ||||
base = os.path.join(*parts[:pos]) | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_("path '%s' is inside nested repo %r") | ||
Augie Fackler
|
r20033 | % (path, base)) | ||
normparts = util.splitpath(normpath) | ||||
assert len(parts) == len(normparts) | ||||
parts.pop() | ||||
normparts.pop() | ||||
prefixes = [] | ||||
Durham Goode
|
r28087 | # It's important that we check the path parts starting from the root. | ||
# This means we won't accidentaly traverse a symlink into some other | ||||
# filesystem (which is potentially expensive to access). | ||||
for i in range(len(parts)): | ||||
prefix = os.sep.join(parts[:i + 1]) | ||||
normprefix = os.sep.join(normparts[:i + 1]) | ||||
Augie Fackler
|
r20033 | if normprefix in self.auditeddir: | ||
Durham Goode
|
r28087 | continue | ||
Pierre-Yves David
|
r27232 | if self._realfs: | ||
self._checkfs(prefix, path) | ||||
Augie Fackler
|
r20033 | prefixes.append(normprefix) | ||
self.audited.add(normpath) | ||||
# only add prefixes to the cache after checking everything: we don't | ||||
# want to add "foo/bar/baz" before checking if there's a "foo/.hg" | ||||
self.auditeddir.update(prefixes) | ||||
Pierre-Yves David
|
r27231 | def _checkfs(self, prefix, path): | ||
"""raise exception if a file system backed check fails""" | ||||
curpath = os.path.join(self.root, prefix) | ||||
try: | ||||
st = os.lstat(curpath) | ||||
except OSError as err: | ||||
# EINVAL can be raised as invalid path syntax under win32. | ||||
# They must be ignored for patterns can be checked too. | ||||
if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): | ||||
raise | ||||
else: | ||||
if stat.S_ISLNK(st.st_mode): | ||||
Pierre-Yves David
|
r27235 | msg = _('path %r traverses symbolic link %r') % (path, prefix) | ||
raise error.Abort(msg) | ||||
Pierre-Yves David
|
r27231 | elif (stat.S_ISDIR(st.st_mode) and | ||
os.path.isdir(os.path.join(curpath, '.hg'))): | ||||
if not self.callback or not self.callback(curpath): | ||||
Pierre-Yves David
|
r27235 | msg = _("path '%s' is inside nested repo %r") | ||
raise error.Abort(msg % (path, prefix)) | ||||
Pierre-Yves David
|
r27231 | |||
Augie Fackler
|
r20033 | def check(self, path): | ||
try: | ||||
self(path) | ||||
return True | ||||
Pierre-Yves David
|
r26587 | except (OSError, error.Abort): | ||
Augie Fackler
|
r20033 | return False | ||
def canonpath(root, cwd, myname, auditor=None): | ||||
'''return the canonical path of myname, given cwd and root''' | ||||
if util.endswithsep(root): | ||||
rootsep = root | ||||
else: | ||||
rootsep = root + os.sep | ||||
name = myname | ||||
if not os.path.isabs(name): | ||||
name = os.path.join(root, cwd, name) | ||||
name = os.path.normpath(name) | ||||
if auditor is None: | ||||
auditor = pathauditor(root) | ||||
if name != rootsep and name.startswith(rootsep): | ||||
name = name[len(rootsep):] | ||||
auditor(name) | ||||
return util.pconvert(name) | ||||
elif name == root: | ||||
return '' | ||||
else: | ||||
# Determine whether `name' is in the hierarchy at or beneath `root', | ||||
# by iterating name=dirname(name) until that causes no change (can't | ||||
# check name == '/', because that doesn't work on windows). The list | ||||
# `rel' holds the reversed list of components making up the relative | ||||
# file name we want. | ||||
rel = [] | ||||
while True: | ||||
try: | ||||
s = util.samefile(name, root) | ||||
except OSError: | ||||
s = False | ||||
if s: | ||||
if not rel: | ||||
# name was actually the same as root (maybe a symlink) | ||||
return '' | ||||
rel.reverse() | ||||
name = os.path.join(*rel) | ||||
auditor(name) | ||||
return util.pconvert(name) | ||||
dirname, basename = util.split(name) | ||||
rel.append(basename) | ||||
if dirname == name: | ||||
break | ||||
name = dirname | ||||
Matt Harbison
|
r25011 | # A common mistake is to use -R, but specify a file relative to the repo | ||
# instead of cwd. Detect that case, and provide a hint to the user. | ||||
hint = None | ||||
try: | ||||
Matt Mackall
|
r25022 | if cwd != root: | ||
canonpath(root, root, myname, auditor) | ||||
hint = (_("consider using '--cwd %s'") | ||||
% os.path.relpath(root, cwd)) | ||||
Pierre-Yves David
|
r26587 | except error.Abort: | ||
Matt Harbison
|
r25011 | pass | ||
Pierre-Yves David
|
r26587 | raise error.Abort(_("%s not under root '%s'") % (myname, root), | ||
Matt Harbison
|
r25011 | hint=hint) | ||
FUJIWARA Katsunori
|
r21568 | |||
def normasprefix(path): | ||||
'''normalize the specified path as path prefix | ||||
Mads Kiilerich
|
r23139 | Returned value can be used safely for "p.startswith(prefix)", | ||
FUJIWARA Katsunori
|
r21568 | "p[len(prefix):]", and so on. | ||
For efficiency, this expects "path" argument to be already | ||||
normalized by "os.path.normpath", "os.path.realpath", and so on. | ||||
See also issue3033 for detail about need of this function. | ||||
>>> normasprefix('/foo/bar').replace(os.sep, '/') | ||||
'/foo/bar/' | ||||
>>> normasprefix('/').replace(os.sep, '/') | ||||
'/' | ||||
''' | ||||
d, p = os.path.splitdrive(path) | ||||
if len(p) != len(os.sep): | ||||
return path + os.sep | ||||
else: | ||||
return path | ||||
Durham Goode
|
r25281 | |||
Augie Fackler
|
r25286 | # forward two methods from posixpath that do what we need, but we'd | ||
# rather not let our internals know that we're thinking in posix terms | ||||
# - instead we'll let them be oblivious. | ||||
join = posixpath.join | ||||
dirname = posixpath.dirname | ||||