pathutil.py
396 lines
| 12.6 KiB
| text/x-python
|
PythonLexer
/ mercurial / pathutil.py
Matt Harbison
|
r52756 | from __future__ import annotations | ||
Valentin Gatien-Baron
|
r45392 | import contextlib | ||
Gregory Szorc
|
r25964 | import errno | ||
import os | ||||
import posixpath | ||||
import stat | ||||
Arseniy Alekseyev
|
r50801 | from typing import ( | ||
Any, | ||||
Callable, | ||||
Iterator, | ||||
Optional, | ||||
) | ||||
Gregory Szorc
|
r25964 | from .i18n import _ | ||
from . import ( | ||||
encoding, | ||||
Pierre-Yves David
|
r26587 | error, | ||
r43923 | policy, | |||
Pulkit Goyal
|
r30614 | pycompat, | ||
Gregory Szorc
|
r25964 | util, | ||
) | ||||
Augie Fackler
|
r20033 | |||
r43923 | rustdirs = policy.importrust('dirstate', 'Dirs') | |||
parsers = policy.importmod('parsers') | ||||
Augie Fackler
|
r43346 | |||
r52180 | def _lowerclean(s: bytes) -> bytes: | |||
Augie Fackler
|
r23598 | return encoding.hfsignoreclean(s.lower()) | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r49801 | class pathauditor: | ||
Augie Fackler
|
r46554 | """ensure that a filesystem path contains no banned components. | ||
Augie Fackler
|
r20033 | the following properties of a path are checked: | ||
- ends with a directory separator | ||||
- under top-level .hg | ||||
- starts at the root of a windows drive | ||||
- contains ".." | ||||
Pierre-Yves David
|
r27232 | |||
More check are also done about the file system states: | ||||
Augie Fackler
|
r20033 | - traverses a symlink (e.g. a/symlink_here/b) | ||
- inside a nested repository (a callback can be used to approve | ||||
some nested repositories, e.g., subrepositories) | ||||
Pierre-Yves David
|
r27232 | |||
The file system checks are only done when 'realfs' is set to True (the | ||||
default). They should be disable then we are auditing path for operation on | ||||
stored history. | ||||
Yuya Nishihara
|
r33722 | |||
If 'cached' is set to True, audited paths and sub-directories are cached. | ||||
Be careful to not keep the cache of unmanaged directories for long because | ||||
audited paths may be replaced with symlinks. | ||||
Augie Fackler
|
r46554 | """ | ||
Augie Fackler
|
r20033 | |||
Yuya Nishihara
|
r33722 | def __init__(self, root, callback=None, realfs=True, cached=False): | ||
Augie Fackler
|
r20033 | self.audited = set() | ||
Arseniy Alekseyev
|
r50784 | self.auditeddir = dict() | ||
Augie Fackler
|
r20033 | self.root = root | ||
Pierre-Yves David
|
r27232 | self._realfs = realfs | ||
Yuya Nishihara
|
r33722 | self._cached = cached | ||
Augie Fackler
|
r20033 | self.callback = callback | ||
Martin von Zweigbergk
|
r29889 | if os.path.lexists(root) and not util.fscasesensitive(root): | ||
Augie Fackler
|
r20033 | self.normcase = util.normcase | ||
else: | ||||
self.normcase = lambda x: x | ||||
r52180 | def __call__(self, path: bytes, mode: Optional[Any] = None) -> None: | |||
Augie Fackler
|
r46554 | """Check the relative path. | ||
path may contain a pattern (e.g. foodir/**.txt)""" | ||||
Augie Fackler
|
r20033 | |||
path = util.localpath(path) | ||||
Arseniy Alekseyev
|
r50780 | if path in self.audited: | ||
Augie Fackler
|
r20033 | return | ||
# AIX ignores "/" at end of path, others raise EISDIR. | ||||
if util.endswithsep(path): | ||||
Martin von Zweigbergk
|
r49192 | raise error.InputError( | ||
_(b"path ends in directory separator: %s") % path | ||||
) | ||||
Augie Fackler
|
r20033 | parts = util.splitpath(path) | ||
Augie Fackler
|
r43346 | if ( | ||
os.path.splitdrive(path)[0] | ||||
Augie Fackler
|
r43347 | or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'') | ||
Augie Fackler
|
r43346 | or pycompat.ospardir in parts | ||
): | ||||
Martin von Zweigbergk
|
r49192 | raise error.InputError( | ||
_(b"path contains illegal component: %s") % path | ||||
) | ||||
Matt Mackall
|
r23599 | # Windows shortname aliases | ||
Arseniy Alekseyev
|
r50779 | if b"~" in path: | ||
for p in parts: | ||||
if b"~" in p: | ||||
first, last = p.split(b"~", 1) | ||||
if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]: | ||||
raise error.InputError( | ||||
_(b"path contains illegal component: %s") % path | ||||
) | ||||
Augie Fackler
|
r43347 | if b'.hg' in _lowerclean(path): | ||
Martin von Zweigbergk
|
r44641 | lparts = [_lowerclean(p) for p in parts] | ||
Augie Fackler
|
r43347 | for p in b'.hg', b'.hg.': | ||
Augie Fackler
|
r20033 | if p in lparts[1:]: | ||
pos = lparts.index(p) | ||||
base = os.path.join(*parts[:pos]) | ||||
Martin von Zweigbergk
|
r49192 | raise error.InputError( | ||
Augie Fackler
|
r43347 | _(b"path '%s' is inside nested repo %r") | ||
Augie Fackler
|
r43346 | % (path, pycompat.bytestr(base)) | ||
) | ||||
Augie Fackler
|
r20033 | |||
Arseniy Alekseyev
|
r50783 | if self._realfs: | ||
# It's important that we check the path parts starting from the root. | ||||
# We don't want to add "foo/bar/baz" to auditeddir before checking if | ||||
# there's a "foo/.hg" directory. This also means we won't accidentally | ||||
# traverse a symlink into some other filesystem (which is potentially | ||||
# expensive to access). | ||||
Arseniy Alekseyev
|
r50803 | for prefix in finddirs_rev_noroot(path): | ||
Arseniy Alekseyev
|
r50783 | if prefix in self.auditeddir: | ||
Arseniy Alekseyev
|
r50784 | res = self.auditeddir[prefix] | ||
else: | ||||
Arseniy Alekseyev
|
r50807 | res = pathauditor._checkfs_exists( | ||
self.root, prefix, path, self.callback | ||||
) | ||||
Arseniy Alekseyev
|
r50784 | if self._cached: | ||
self.auditeddir[prefix] = res | ||||
Arseniy Alekseyev
|
r50783 | if not res: | ||
break | ||||
Augie Fackler
|
r20033 | |||
Yuya Nishihara
|
r33722 | if self._cached: | ||
Arseniy Alekseyev
|
r50780 | self.audited.add(path) | ||
Augie Fackler
|
r20033 | |||
Arseniy Alekseyev
|
r50807 | @staticmethod | ||
def _checkfs_exists( | ||||
root, | ||||
prefix: bytes, | ||||
path: bytes, | ||||
callback: Optional[Callable[[bytes], bool]] = None, | ||||
): | ||||
Arseniy Alekseyev
|
r50782 | """raise exception if a file system backed check fails. | ||
Return a bool that indicates that the directory (or file) exists.""" | ||||
Arseniy Alekseyev
|
r50807 | curpath = os.path.join(root, prefix) | ||
Pierre-Yves David
|
r27231 | try: | ||
st = os.lstat(curpath) | ||||
except OSError as err: | ||||
Arseniy Alekseyev
|
r50782 | if err.errno == errno.ENOENT: | ||
return False | ||||
Pierre-Yves David
|
r27231 | # EINVAL can be raised as invalid path syntax under win32. | ||
# They must be ignored for patterns can be checked too. | ||||
if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL): | ||||
raise | ||||
else: | ||||
if stat.S_ISLNK(st.st_mode): | ||||
Augie Fackler
|
r43347 | msg = _(b'path %r traverses symbolic link %r') % ( | ||
Augie Fackler
|
r43346 | pycompat.bytestr(path), | ||
pycompat.bytestr(prefix), | ||||
) | ||||
Pierre-Yves David
|
r27235 | raise error.Abort(msg) | ||
Augie Fackler
|
r43346 | elif stat.S_ISDIR(st.st_mode) and os.path.isdir( | ||
Augie Fackler
|
r43347 | os.path.join(curpath, b'.hg') | ||
Augie Fackler
|
r43346 | ): | ||
Arseniy Alekseyev
|
r50807 | if not callback or not callback(curpath): | ||
Augie Fackler
|
r43347 | msg = _(b"path '%s' is inside nested repo %r") | ||
Yuya Nishihara
|
r36667 | raise error.Abort(msg % (path, pycompat.bytestr(prefix))) | ||
Arseniy Alekseyev
|
r50782 | return True | ||
Pierre-Yves David
|
r27231 | |||
r52180 | def check(self, path: bytes) -> bool: | |||
Augie Fackler
|
r20033 | try: | ||
self(path) | ||||
return True | ||||
Pierre-Yves David
|
r26587 | except (OSError, error.Abort): | ||
Augie Fackler
|
r20033 | return False | ||
Valentin Gatien-Baron
|
r45392 | @contextlib.contextmanager | ||
def cached(self): | ||||
if self._cached: | ||||
yield | ||||
else: | ||||
try: | ||||
self._cached = True | ||||
yield | ||||
finally: | ||||
self.audited.clear() | ||||
self.auditeddir.clear() | ||||
self._cached = False | ||||
r52485 | def clear_audit_cache(self): | |||
"""reset all audit cache | ||||
intended for debug and performance benchmark purposes""" | ||||
self.audited.clear() | ||||
self.auditeddir.clear() | ||||
Augie Fackler
|
r43346 | |||
r52180 | def canonpath( | |||
root: bytes, | ||||
cwd: bytes, | ||||
myname: bytes, | ||||
auditor: Optional[pathauditor] = None, | ||||
) -> bytes: | ||||
Augie Fackler
|
r46554 | """return the canonical path of myname, given cwd and root | ||
Matt Harbison
|
r34981 | |||
>>> def check(root, cwd, myname): | ||||
... a = pathauditor(root, realfs=False) | ||||
... try: | ||||
... return canonpath(root, cwd, myname, a) | ||||
... except error.Abort: | ||||
... return 'aborted' | ||||
>>> def unixonly(root, cwd, myname, expected='aborted'): | ||||
... if pycompat.iswindows: | ||||
... return expected | ||||
... return check(root, cwd, myname) | ||||
>>> def winonly(root, cwd, myname, expected='aborted'): | ||||
... if not pycompat.iswindows: | ||||
... return expected | ||||
... return check(root, cwd, myname) | ||||
>>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename') | ||||
'aborted' | ||||
>>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename') | ||||
'aborted' | ||||
>>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename') | ||||
'aborted' | ||||
>>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename', | ||||
... b'filename') | ||||
'filename' | ||||
>>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename') | ||||
'filename' | ||||
>>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename', | ||||
... b'subdir/filename') | ||||
'subdir/filename' | ||||
>>> unixonly(b'/repo', b'/dir', b'filename') | ||||
'aborted' | ||||
>>> unixonly(b'/repo', b'/', b'filename') | ||||
'aborted' | ||||
>>> unixonly(b'/repo', b'/', b'repo/filename', b'filename') | ||||
'filename' | ||||
>>> unixonly(b'/repo', b'/repo', b'filename', b'filename') | ||||
'filename' | ||||
>>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename') | ||||
'subdir/filename' | ||||
Augie Fackler
|
r46554 | """ | ||
Augie Fackler
|
r20033 | if util.endswithsep(root): | ||
rootsep = root | ||||
else: | ||||
Pulkit Goyal
|
r30614 | rootsep = root + pycompat.ossep | ||
Augie Fackler
|
r20033 | name = myname | ||
if not os.path.isabs(name): | ||||
name = os.path.join(root, cwd, name) | ||||
name = os.path.normpath(name) | ||||
if auditor is None: | ||||
auditor = pathauditor(root) | ||||
if name != rootsep and name.startswith(rootsep): | ||||
Augie Fackler
|
r43346 | name = name[len(rootsep) :] | ||
Augie Fackler
|
r20033 | auditor(name) | ||
return util.pconvert(name) | ||||
elif name == root: | ||||
Augie Fackler
|
r43347 | return b'' | ||
Augie Fackler
|
r20033 | else: | ||
# Determine whether `name' is in the hierarchy at or beneath `root', | ||||
# by iterating name=dirname(name) until that causes no change (can't | ||||
# check name == '/', because that doesn't work on windows). The list | ||||
# `rel' holds the reversed list of components making up the relative | ||||
# file name we want. | ||||
rel = [] | ||||
while True: | ||||
try: | ||||
s = util.samefile(name, root) | ||||
except OSError: | ||||
s = False | ||||
if s: | ||||
if not rel: | ||||
# name was actually the same as root (maybe a symlink) | ||||
Augie Fackler
|
r43347 | return b'' | ||
Augie Fackler
|
r20033 | rel.reverse() | ||
name = os.path.join(*rel) | ||||
auditor(name) | ||||
return util.pconvert(name) | ||||
dirname, basename = util.split(name) | ||||
rel.append(basename) | ||||
if dirname == name: | ||||
break | ||||
name = dirname | ||||
Matt Harbison
|
r25011 | # A common mistake is to use -R, but specify a file relative to the repo | ||
# instead of cwd. Detect that case, and provide a hint to the user. | ||||
hint = None | ||||
try: | ||||
Matt Mackall
|
r25022 | if cwd != root: | ||
canonpath(root, root, myname, auditor) | ||||
Augie Fackler
|
r43347 | relpath = util.pathto(root, cwd, b'') | ||
Yuya Nishihara
|
r38611 | if relpath.endswith(pycompat.ossep): | ||
Matt Harbison
|
r34966 | relpath = relpath[:-1] | ||
Augie Fackler
|
r43347 | hint = _(b"consider using '--cwd %s'") % relpath | ||
Pierre-Yves David
|
r26587 | except error.Abort: | ||
Matt Harbison
|
r25011 | pass | ||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Augie Fackler
|
r43347 | _(b"%s not under root '%s'") % (myname, root), hint=hint | ||
Augie Fackler
|
r43346 | ) | ||
FUJIWARA Katsunori
|
r21568 | |||
r52180 | def normasprefix(path: bytes) -> bytes: | |||
Augie Fackler
|
r46554 | """normalize the specified path as path prefix | ||
FUJIWARA Katsunori
|
r21568 | |||
Mads Kiilerich
|
r23139 | Returned value can be used safely for "p.startswith(prefix)", | ||
FUJIWARA Katsunori
|
r21568 | "p[len(prefix):]", and so on. | ||
For efficiency, this expects "path" argument to be already | ||||
normalized by "os.path.normpath", "os.path.realpath", and so on. | ||||
See also issue3033 for detail about need of this function. | ||||
Yuya Nishihara
|
r34255 | >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/') | ||
FUJIWARA Katsunori
|
r21568 | '/foo/bar/' | ||
Yuya Nishihara
|
r34255 | >>> normasprefix(b'/').replace(pycompat.ossep, b'/') | ||
FUJIWARA Katsunori
|
r21568 | '/' | ||
Augie Fackler
|
r46554 | """ | ||
FUJIWARA Katsunori
|
r21568 | d, p = os.path.splitdrive(path) | ||
Pulkit Goyal
|
r30614 | if len(p) != len(pycompat.ossep): | ||
return path + pycompat.ossep | ||||
FUJIWARA Katsunori
|
r21568 | else: | ||
return path | ||||
Durham Goode
|
r25281 | |||
Augie Fackler
|
r43346 | |||
r52180 | def finddirs(path: bytes) -> Iterator[bytes]: | |||
Martin von Zweigbergk
|
r44032 | pos = path.rfind(b'/') | ||
while pos != -1: | ||||
yield path[:pos] | ||||
pos = path.rfind(b'/', 0, pos) | ||||
yield b'' | ||||
Arseniy Alekseyev
|
r50802 | def finddirs_rev_noroot(path: bytes) -> Iterator[bytes]: | ||
pos = path.find(pycompat.ossep) | ||||
while pos != -1: | ||||
yield path[:pos] | ||||
pos = path.find(pycompat.ossep, pos + 1) | ||||
Gregory Szorc
|
r49801 | class dirs: | ||
r43923 | '''a multiset of directory names from a set of file paths''' | |||
r48756 | def __init__(self, map, only_tracked=False): | |||
Augie Fackler
|
r46554 | """ | ||
Josef 'Jeff' Sipek
|
r45116 | a dict map indicates a dirstate while a list indicates a manifest | ||
Augie Fackler
|
r46554 | """ | ||
r43923 | self._dirs = {} | |||
addpath = self.addpath | ||||
r48756 | if isinstance(map, dict) and only_tracked: | |||
Gregory Szorc
|
r49768 | for f, s in map.items(): | ||
r48756 | if s.state != b'r': | |||
r43923 | addpath(f) | |||
r48756 | elif only_tracked: | |||
msg = b"`only_tracked` is only supported with a dict source" | ||||
raise error.ProgrammingError(msg) | ||||
r43923 | else: | |||
for f in map: | ||||
addpath(f) | ||||
r52180 | def addpath(self, path: bytes) -> None: | |||
r43923 | dirs = self._dirs | |||
Martin von Zweigbergk
|
r44032 | for base in finddirs(path): | ||
r43923 | if base.endswith(b'/'): | |||
raise ValueError( | ||||
"found invalid consecutive slashes in path: %r" % base | ||||
) | ||||
if base in dirs: | ||||
dirs[base] += 1 | ||||
return | ||||
dirs[base] = 1 | ||||
r52180 | def delpath(self, path: bytes) -> None: | |||
r43923 | dirs = self._dirs | |||
Martin von Zweigbergk
|
r44032 | for base in finddirs(path): | ||
r43923 | if dirs[base] > 1: | |||
dirs[base] -= 1 | ||||
return | ||||
del dirs[base] | ||||
def __iter__(self): | ||||
return iter(self._dirs) | ||||
r52180 | def __contains__(self, d: bytes) -> bool: | |||
r43923 | return d in self._dirs | |||
r51821 | if hasattr(parsers, 'dirs'): | |||
r43923 | dirs = parsers.dirs | |||
if rustdirs is not None: | ||||
dirs = rustdirs | ||||
Augie Fackler
|
r25286 | # forward two methods from posixpath that do what we need, but we'd | ||
# rather not let our internals know that we're thinking in posix terms | ||||
# - instead we'll let them be oblivious. | ||||
join = posixpath.join | ||||
r52181 | dirname: Callable[[bytes], bytes] = posixpath.dirname | |||