##// END OF EJS Templates
typing: use python3-style type annotation
Arseniy Alekseyev -
r50785:5f664401 default
parent child Browse files
Show More
@@ -1,381 +1,380 b''
1 import contextlib
1 import contextlib
2 import errno
2 import errno
3 import os
3 import os
4 import posixpath
4 import posixpath
5 import stat
5 import stat
6
6
7 from .i18n import _
7 from .i18n import _
8 from . import (
8 from . import (
9 encoding,
9 encoding,
10 error,
10 error,
11 policy,
11 policy,
12 pycompat,
12 pycompat,
13 util,
13 util,
14 )
14 )
15
15
16 if pycompat.TYPE_CHECKING:
16 if pycompat.TYPE_CHECKING:
17 from typing import (
17 from typing import (
18 Any,
18 Any,
19 Callable,
19 Callable,
20 Iterator,
20 Iterator,
21 Optional,
21 Optional,
22 )
22 )
23
23
24
24
25 rustdirs = policy.importrust('dirstate', 'Dirs')
25 rustdirs = policy.importrust('dirstate', 'Dirs')
26 parsers = policy.importmod('parsers')
26 parsers = policy.importmod('parsers')
27
27
28
28
29 def _lowerclean(s):
29 def _lowerclean(s):
30 # type: (bytes) -> bytes
30 # type: (bytes) -> bytes
31 return encoding.hfsignoreclean(s.lower())
31 return encoding.hfsignoreclean(s.lower())
32
32
33
33
34 class pathauditor:
34 class pathauditor:
35 """ensure that a filesystem path contains no banned components.
35 """ensure that a filesystem path contains no banned components.
36 the following properties of a path are checked:
36 the following properties of a path are checked:
37
37
38 - ends with a directory separator
38 - ends with a directory separator
39 - under top-level .hg
39 - under top-level .hg
40 - starts at the root of a windows drive
40 - starts at the root of a windows drive
41 - contains ".."
41 - contains ".."
42
42
43 More check are also done about the file system states:
43 More check are also done about the file system states:
44 - traverses a symlink (e.g. a/symlink_here/b)
44 - traverses a symlink (e.g. a/symlink_here/b)
45 - inside a nested repository (a callback can be used to approve
45 - inside a nested repository (a callback can be used to approve
46 some nested repositories, e.g., subrepositories)
46 some nested repositories, e.g., subrepositories)
47
47
48 The file system checks are only done when 'realfs' is set to True (the
48 The file system checks are only done when 'realfs' is set to True (the
49 default). They should be disable then we are auditing path for operation on
49 default). They should be disable then we are auditing path for operation on
50 stored history.
50 stored history.
51
51
52 If 'cached' is set to True, audited paths and sub-directories are cached.
52 If 'cached' is set to True, audited paths and sub-directories are cached.
53 Be careful to not keep the cache of unmanaged directories for long because
53 Be careful to not keep the cache of unmanaged directories for long because
54 audited paths may be replaced with symlinks.
54 audited paths may be replaced with symlinks.
55 """
55 """
56
56
57 def __init__(self, root, callback=None, realfs=True, cached=False):
57 def __init__(self, root, callback=None, realfs=True, cached=False):
58 self.audited = set()
58 self.audited = set()
59 self.auditeddir = dict()
59 self.auditeddir = dict()
60 self.root = root
60 self.root = root
61 self._realfs = realfs
61 self._realfs = realfs
62 self._cached = cached
62 self._cached = cached
63 self.callback = callback
63 self.callback = callback
64 if os.path.lexists(root) and not util.fscasesensitive(root):
64 if os.path.lexists(root) and not util.fscasesensitive(root):
65 self.normcase = util.normcase
65 self.normcase = util.normcase
66 else:
66 else:
67 self.normcase = lambda x: x
67 self.normcase = lambda x: x
68
68
69 def __call__(self, path, mode=None):
69 def __call__(self, path, mode=None):
70 # type: (bytes, Optional[Any]) -> None
70 # type: (bytes, Optional[Any]) -> None
71 """Check the relative path.
71 """Check the relative path.
72 path may contain a pattern (e.g. foodir/**.txt)"""
72 path may contain a pattern (e.g. foodir/**.txt)"""
73
73
74 path = util.localpath(path)
74 path = util.localpath(path)
75 if path in self.audited:
75 if path in self.audited:
76 return
76 return
77 # AIX ignores "/" at end of path, others raise EISDIR.
77 # AIX ignores "/" at end of path, others raise EISDIR.
78 if util.endswithsep(path):
78 if util.endswithsep(path):
79 raise error.InputError(
79 raise error.InputError(
80 _(b"path ends in directory separator: %s") % path
80 _(b"path ends in directory separator: %s") % path
81 )
81 )
82 parts = util.splitpath(path)
82 parts = util.splitpath(path)
83 if (
83 if (
84 os.path.splitdrive(path)[0]
84 os.path.splitdrive(path)[0]
85 or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'')
85 or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'')
86 or pycompat.ospardir in parts
86 or pycompat.ospardir in parts
87 ):
87 ):
88 raise error.InputError(
88 raise error.InputError(
89 _(b"path contains illegal component: %s") % path
89 _(b"path contains illegal component: %s") % path
90 )
90 )
91 # Windows shortname aliases
91 # Windows shortname aliases
92 if b"~" in path:
92 if b"~" in path:
93 for p in parts:
93 for p in parts:
94 if b"~" in p:
94 if b"~" in p:
95 first, last = p.split(b"~", 1)
95 first, last = p.split(b"~", 1)
96 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
96 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
97 raise error.InputError(
97 raise error.InputError(
98 _(b"path contains illegal component: %s") % path
98 _(b"path contains illegal component: %s") % path
99 )
99 )
100 if b'.hg' in _lowerclean(path):
100 if b'.hg' in _lowerclean(path):
101 lparts = [_lowerclean(p) for p in parts]
101 lparts = [_lowerclean(p) for p in parts]
102 for p in b'.hg', b'.hg.':
102 for p in b'.hg', b'.hg.':
103 if p in lparts[1:]:
103 if p in lparts[1:]:
104 pos = lparts.index(p)
104 pos = lparts.index(p)
105 base = os.path.join(*parts[:pos])
105 base = os.path.join(*parts[:pos])
106 raise error.InputError(
106 raise error.InputError(
107 _(b"path '%s' is inside nested repo %r")
107 _(b"path '%s' is inside nested repo %r")
108 % (path, pycompat.bytestr(base))
108 % (path, pycompat.bytestr(base))
109 )
109 )
110
110
111 if self._realfs:
111 if self._realfs:
112 parts.pop()
112 parts.pop()
113 # It's important that we check the path parts starting from the root.
113 # It's important that we check the path parts starting from the root.
114 # We don't want to add "foo/bar/baz" to auditeddir before checking if
114 # We don't want to add "foo/bar/baz" to auditeddir before checking if
115 # there's a "foo/.hg" directory. This also means we won't accidentally
115 # there's a "foo/.hg" directory. This also means we won't accidentally
116 # traverse a symlink into some other filesystem (which is potentially
116 # traverse a symlink into some other filesystem (which is potentially
117 # expensive to access).
117 # expensive to access).
118 for i in range(len(parts)):
118 for i in range(len(parts)):
119 prefix = pycompat.ossep.join(parts[: i + 1])
119 prefix = pycompat.ossep.join(parts[: i + 1])
120 if prefix in self.auditeddir:
120 if prefix in self.auditeddir:
121 res = self.auditeddir[prefix]
121 res = self.auditeddir[prefix]
122 else:
122 else:
123 res = self._checkfs_exists(prefix, path)
123 res = self._checkfs_exists(prefix, path)
124 if self._cached:
124 if self._cached:
125 self.auditeddir[prefix] = res
125 self.auditeddir[prefix] = res
126 if not res:
126 if not res:
127 break
127 break
128
128
129 if self._cached:
129 if self._cached:
130 self.audited.add(path)
130 self.audited.add(path)
131
131
132 def _checkfs_exists(self, prefix, path):
132 def _checkfs_exists(self, prefix: bytes, path: bytes) -> bool:
133 # type: (bytes, bytes) -> bool
134 """raise exception if a file system backed check fails.
133 """raise exception if a file system backed check fails.
135
134
136 Return a bool that indicates that the directory (or file) exists."""
135 Return a bool that indicates that the directory (or file) exists."""
137 curpath = os.path.join(self.root, prefix)
136 curpath = os.path.join(self.root, prefix)
138 try:
137 try:
139 st = os.lstat(curpath)
138 st = os.lstat(curpath)
140 except OSError as err:
139 except OSError as err:
141 if err.errno == errno.ENOENT:
140 if err.errno == errno.ENOENT:
142 return False
141 return False
143 # EINVAL can be raised as invalid path syntax under win32.
142 # EINVAL can be raised as invalid path syntax under win32.
144 # They must be ignored for patterns can be checked too.
143 # They must be ignored for patterns can be checked too.
145 if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
144 if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
146 raise
145 raise
147 else:
146 else:
148 if stat.S_ISLNK(st.st_mode):
147 if stat.S_ISLNK(st.st_mode):
149 msg = _(b'path %r traverses symbolic link %r') % (
148 msg = _(b'path %r traverses symbolic link %r') % (
150 pycompat.bytestr(path),
149 pycompat.bytestr(path),
151 pycompat.bytestr(prefix),
150 pycompat.bytestr(prefix),
152 )
151 )
153 raise error.Abort(msg)
152 raise error.Abort(msg)
154 elif stat.S_ISDIR(st.st_mode) and os.path.isdir(
153 elif stat.S_ISDIR(st.st_mode) and os.path.isdir(
155 os.path.join(curpath, b'.hg')
154 os.path.join(curpath, b'.hg')
156 ):
155 ):
157 if not self.callback or not self.callback(curpath):
156 if not self.callback or not self.callback(curpath):
158 msg = _(b"path '%s' is inside nested repo %r")
157 msg = _(b"path '%s' is inside nested repo %r")
159 raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
158 raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
160 return True
159 return True
161
160
162 def check(self, path):
161 def check(self, path):
163 # type: (bytes) -> bool
162 # type: (bytes) -> bool
164 try:
163 try:
165 self(path)
164 self(path)
166 return True
165 return True
167 except (OSError, error.Abort):
166 except (OSError, error.Abort):
168 return False
167 return False
169
168
170 @contextlib.contextmanager
169 @contextlib.contextmanager
171 def cached(self):
170 def cached(self):
172 if self._cached:
171 if self._cached:
173 yield
172 yield
174 else:
173 else:
175 try:
174 try:
176 self._cached = True
175 self._cached = True
177 yield
176 yield
178 finally:
177 finally:
179 self.audited.clear()
178 self.audited.clear()
180 self.auditeddir.clear()
179 self.auditeddir.clear()
181 self._cached = False
180 self._cached = False
182
181
183
182
184 def canonpath(root, cwd, myname, auditor=None):
183 def canonpath(root, cwd, myname, auditor=None):
185 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
184 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
186 """return the canonical path of myname, given cwd and root
185 """return the canonical path of myname, given cwd and root
187
186
188 >>> def check(root, cwd, myname):
187 >>> def check(root, cwd, myname):
189 ... a = pathauditor(root, realfs=False)
188 ... a = pathauditor(root, realfs=False)
190 ... try:
189 ... try:
191 ... return canonpath(root, cwd, myname, a)
190 ... return canonpath(root, cwd, myname, a)
192 ... except error.Abort:
191 ... except error.Abort:
193 ... return 'aborted'
192 ... return 'aborted'
194 >>> def unixonly(root, cwd, myname, expected='aborted'):
193 >>> def unixonly(root, cwd, myname, expected='aborted'):
195 ... if pycompat.iswindows:
194 ... if pycompat.iswindows:
196 ... return expected
195 ... return expected
197 ... return check(root, cwd, myname)
196 ... return check(root, cwd, myname)
198 >>> def winonly(root, cwd, myname, expected='aborted'):
197 >>> def winonly(root, cwd, myname, expected='aborted'):
199 ... if not pycompat.iswindows:
198 ... if not pycompat.iswindows:
200 ... return expected
199 ... return expected
201 ... return check(root, cwd, myname)
200 ... return check(root, cwd, myname)
202 >>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename')
201 >>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename')
203 'aborted'
202 'aborted'
204 >>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename')
203 >>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename')
205 'aborted'
204 'aborted'
206 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename')
205 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename')
207 'aborted'
206 'aborted'
208 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename',
207 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename',
209 ... b'filename')
208 ... b'filename')
210 'filename'
209 'filename'
211 >>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename')
210 >>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename')
212 'filename'
211 'filename'
213 >>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename',
212 >>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename',
214 ... b'subdir/filename')
213 ... b'subdir/filename')
215 'subdir/filename'
214 'subdir/filename'
216 >>> unixonly(b'/repo', b'/dir', b'filename')
215 >>> unixonly(b'/repo', b'/dir', b'filename')
217 'aborted'
216 'aborted'
218 >>> unixonly(b'/repo', b'/', b'filename')
217 >>> unixonly(b'/repo', b'/', b'filename')
219 'aborted'
218 'aborted'
220 >>> unixonly(b'/repo', b'/', b'repo/filename', b'filename')
219 >>> unixonly(b'/repo', b'/', b'repo/filename', b'filename')
221 'filename'
220 'filename'
222 >>> unixonly(b'/repo', b'/repo', b'filename', b'filename')
221 >>> unixonly(b'/repo', b'/repo', b'filename', b'filename')
223 'filename'
222 'filename'
224 >>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename')
223 >>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename')
225 'subdir/filename'
224 'subdir/filename'
226 """
225 """
227 if util.endswithsep(root):
226 if util.endswithsep(root):
228 rootsep = root
227 rootsep = root
229 else:
228 else:
230 rootsep = root + pycompat.ossep
229 rootsep = root + pycompat.ossep
231 name = myname
230 name = myname
232 if not os.path.isabs(name):
231 if not os.path.isabs(name):
233 name = os.path.join(root, cwd, name)
232 name = os.path.join(root, cwd, name)
234 name = os.path.normpath(name)
233 name = os.path.normpath(name)
235 if auditor is None:
234 if auditor is None:
236 auditor = pathauditor(root)
235 auditor = pathauditor(root)
237 if name != rootsep and name.startswith(rootsep):
236 if name != rootsep and name.startswith(rootsep):
238 name = name[len(rootsep) :]
237 name = name[len(rootsep) :]
239 auditor(name)
238 auditor(name)
240 return util.pconvert(name)
239 return util.pconvert(name)
241 elif name == root:
240 elif name == root:
242 return b''
241 return b''
243 else:
242 else:
244 # Determine whether `name' is in the hierarchy at or beneath `root',
243 # Determine whether `name' is in the hierarchy at or beneath `root',
245 # by iterating name=dirname(name) until that causes no change (can't
244 # by iterating name=dirname(name) until that causes no change (can't
246 # check name == '/', because that doesn't work on windows). The list
245 # check name == '/', because that doesn't work on windows). The list
247 # `rel' holds the reversed list of components making up the relative
246 # `rel' holds the reversed list of components making up the relative
248 # file name we want.
247 # file name we want.
249 rel = []
248 rel = []
250 while True:
249 while True:
251 try:
250 try:
252 s = util.samefile(name, root)
251 s = util.samefile(name, root)
253 except OSError:
252 except OSError:
254 s = False
253 s = False
255 if s:
254 if s:
256 if not rel:
255 if not rel:
257 # name was actually the same as root (maybe a symlink)
256 # name was actually the same as root (maybe a symlink)
258 return b''
257 return b''
259 rel.reverse()
258 rel.reverse()
260 name = os.path.join(*rel)
259 name = os.path.join(*rel)
261 auditor(name)
260 auditor(name)
262 return util.pconvert(name)
261 return util.pconvert(name)
263 dirname, basename = util.split(name)
262 dirname, basename = util.split(name)
264 rel.append(basename)
263 rel.append(basename)
265 if dirname == name:
264 if dirname == name:
266 break
265 break
267 name = dirname
266 name = dirname
268
267
269 # A common mistake is to use -R, but specify a file relative to the repo
268 # A common mistake is to use -R, but specify a file relative to the repo
270 # instead of cwd. Detect that case, and provide a hint to the user.
269 # instead of cwd. Detect that case, and provide a hint to the user.
271 hint = None
270 hint = None
272 try:
271 try:
273 if cwd != root:
272 if cwd != root:
274 canonpath(root, root, myname, auditor)
273 canonpath(root, root, myname, auditor)
275 relpath = util.pathto(root, cwd, b'')
274 relpath = util.pathto(root, cwd, b'')
276 if relpath.endswith(pycompat.ossep):
275 if relpath.endswith(pycompat.ossep):
277 relpath = relpath[:-1]
276 relpath = relpath[:-1]
278 hint = _(b"consider using '--cwd %s'") % relpath
277 hint = _(b"consider using '--cwd %s'") % relpath
279 except error.Abort:
278 except error.Abort:
280 pass
279 pass
281
280
282 raise error.Abort(
281 raise error.Abort(
283 _(b"%s not under root '%s'") % (myname, root), hint=hint
282 _(b"%s not under root '%s'") % (myname, root), hint=hint
284 )
283 )
285
284
286
285
287 def normasprefix(path):
286 def normasprefix(path):
288 # type: (bytes) -> bytes
287 # type: (bytes) -> bytes
289 """normalize the specified path as path prefix
288 """normalize the specified path as path prefix
290
289
291 Returned value can be used safely for "p.startswith(prefix)",
290 Returned value can be used safely for "p.startswith(prefix)",
292 "p[len(prefix):]", and so on.
291 "p[len(prefix):]", and so on.
293
292
294 For efficiency, this expects "path" argument to be already
293 For efficiency, this expects "path" argument to be already
295 normalized by "os.path.normpath", "os.path.realpath", and so on.
294 normalized by "os.path.normpath", "os.path.realpath", and so on.
296
295
297 See also issue3033 for detail about need of this function.
296 See also issue3033 for detail about need of this function.
298
297
299 >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/')
298 >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/')
300 '/foo/bar/'
299 '/foo/bar/'
301 >>> normasprefix(b'/').replace(pycompat.ossep, b'/')
300 >>> normasprefix(b'/').replace(pycompat.ossep, b'/')
302 '/'
301 '/'
303 """
302 """
304 d, p = os.path.splitdrive(path)
303 d, p = os.path.splitdrive(path)
305 if len(p) != len(pycompat.ossep):
304 if len(p) != len(pycompat.ossep):
306 return path + pycompat.ossep
305 return path + pycompat.ossep
307 else:
306 else:
308 return path
307 return path
309
308
310
309
311 def finddirs(path):
310 def finddirs(path):
312 # type: (bytes) -> Iterator[bytes]
311 # type: (bytes) -> Iterator[bytes]
313 pos = path.rfind(b'/')
312 pos = path.rfind(b'/')
314 while pos != -1:
313 while pos != -1:
315 yield path[:pos]
314 yield path[:pos]
316 pos = path.rfind(b'/', 0, pos)
315 pos = path.rfind(b'/', 0, pos)
317 yield b''
316 yield b''
318
317
319
318
320 class dirs:
319 class dirs:
321 '''a multiset of directory names from a set of file paths'''
320 '''a multiset of directory names from a set of file paths'''
322
321
323 def __init__(self, map, only_tracked=False):
322 def __init__(self, map, only_tracked=False):
324 """
323 """
325 a dict map indicates a dirstate while a list indicates a manifest
324 a dict map indicates a dirstate while a list indicates a manifest
326 """
325 """
327 self._dirs = {}
326 self._dirs = {}
328 addpath = self.addpath
327 addpath = self.addpath
329 if isinstance(map, dict) and only_tracked:
328 if isinstance(map, dict) and only_tracked:
330 for f, s in map.items():
329 for f, s in map.items():
331 if s.state != b'r':
330 if s.state != b'r':
332 addpath(f)
331 addpath(f)
333 elif only_tracked:
332 elif only_tracked:
334 msg = b"`only_tracked` is only supported with a dict source"
333 msg = b"`only_tracked` is only supported with a dict source"
335 raise error.ProgrammingError(msg)
334 raise error.ProgrammingError(msg)
336 else:
335 else:
337 for f in map:
336 for f in map:
338 addpath(f)
337 addpath(f)
339
338
340 def addpath(self, path):
339 def addpath(self, path):
341 # type: (bytes) -> None
340 # type: (bytes) -> None
342 dirs = self._dirs
341 dirs = self._dirs
343 for base in finddirs(path):
342 for base in finddirs(path):
344 if base.endswith(b'/'):
343 if base.endswith(b'/'):
345 raise ValueError(
344 raise ValueError(
346 "found invalid consecutive slashes in path: %r" % base
345 "found invalid consecutive slashes in path: %r" % base
347 )
346 )
348 if base in dirs:
347 if base in dirs:
349 dirs[base] += 1
348 dirs[base] += 1
350 return
349 return
351 dirs[base] = 1
350 dirs[base] = 1
352
351
353 def delpath(self, path):
352 def delpath(self, path):
354 # type: (bytes) -> None
353 # type: (bytes) -> None
355 dirs = self._dirs
354 dirs = self._dirs
356 for base in finddirs(path):
355 for base in finddirs(path):
357 if dirs[base] > 1:
356 if dirs[base] > 1:
358 dirs[base] -= 1
357 dirs[base] -= 1
359 return
358 return
360 del dirs[base]
359 del dirs[base]
361
360
362 def __iter__(self):
361 def __iter__(self):
363 return iter(self._dirs)
362 return iter(self._dirs)
364
363
365 def __contains__(self, d):
364 def __contains__(self, d):
366 # type: (bytes) -> bool
365 # type: (bytes) -> bool
367 return d in self._dirs
366 return d in self._dirs
368
367
369
368
370 if util.safehasattr(parsers, 'dirs'):
369 if util.safehasattr(parsers, 'dirs'):
371 dirs = parsers.dirs
370 dirs = parsers.dirs
372
371
373 if rustdirs is not None:
372 if rustdirs is not None:
374 dirs = rustdirs
373 dirs = rustdirs
375
374
376
375
377 # forward two methods from posixpath that do what we need, but we'd
376 # forward two methods from posixpath that do what we need, but we'd
378 # rather not let our internals know that we're thinking in posix terms
377 # rather not let our internals know that we're thinking in posix terms
379 # - instead we'll let them be oblivious.
378 # - instead we'll let them be oblivious.
380 join = posixpath.join
379 join = posixpath.join
381 dirname = posixpath.dirname # type: Callable[[bytes], bytes]
380 dirname = posixpath.dirname # type: Callable[[bytes], bytes]
General Comments 0
You need to be logged in to leave comments. Login now