##// END OF EJS Templates
pathutil: slightly faster path audit in the common case
Arseniy Alekseyev -
r50779:44deb5a1 default
parent child Browse files
Show More
@@ -1,378 +1,379 b''
1 1 import contextlib
2 2 import errno
3 3 import os
4 4 import posixpath
5 5 import stat
6 6
7 7 from .i18n import _
8 8 from . import (
9 9 encoding,
10 10 error,
11 11 policy,
12 12 pycompat,
13 13 util,
14 14 )
15 15
16 16 if pycompat.TYPE_CHECKING:
17 17 from typing import (
18 18 Any,
19 19 Callable,
20 20 Iterator,
21 21 Optional,
22 22 )
23 23
24 24
25 25 rustdirs = policy.importrust('dirstate', 'Dirs')
26 26 parsers = policy.importmod('parsers')
27 27
28 28
29 29 def _lowerclean(s):
30 30 # type: (bytes) -> bytes
31 31 return encoding.hfsignoreclean(s.lower())
32 32
33 33
34 34 class pathauditor:
35 35 """ensure that a filesystem path contains no banned components.
36 36 the following properties of a path are checked:
37 37
38 38 - ends with a directory separator
39 39 - under top-level .hg
40 40 - starts at the root of a windows drive
41 41 - contains ".."
42 42
43 43 More check are also done about the file system states:
44 44 - traverses a symlink (e.g. a/symlink_here/b)
45 45 - inside a nested repository (a callback can be used to approve
46 46 some nested repositories, e.g., subrepositories)
47 47
48 48 The file system checks are only done when 'realfs' is set to True (the
49 49 default). They should be disable then we are auditing path for operation on
50 50 stored history.
51 51
52 52 If 'cached' is set to True, audited paths and sub-directories are cached.
53 53 Be careful to not keep the cache of unmanaged directories for long because
54 54 audited paths may be replaced with symlinks.
55 55 """
56 56
57 57 def __init__(self, root, callback=None, realfs=True, cached=False):
58 58 self.audited = set()
59 59 self.auditeddir = set()
60 60 self.root = root
61 61 self._realfs = realfs
62 62 self._cached = cached
63 63 self.callback = callback
64 64 if os.path.lexists(root) and not util.fscasesensitive(root):
65 65 self.normcase = util.normcase
66 66 else:
67 67 self.normcase = lambda x: x
68 68
69 69 def __call__(self, path, mode=None):
70 70 # type: (bytes, Optional[Any]) -> None
71 71 """Check the relative path.
72 72 path may contain a pattern (e.g. foodir/**.txt)"""
73 73
74 74 path = util.localpath(path)
75 75 normpath = self.normcase(path)
76 76 if normpath in self.audited:
77 77 return
78 78 # AIX ignores "/" at end of path, others raise EISDIR.
79 79 if util.endswithsep(path):
80 80 raise error.InputError(
81 81 _(b"path ends in directory separator: %s") % path
82 82 )
83 83 parts = util.splitpath(path)
84 84 if (
85 85 os.path.splitdrive(path)[0]
86 86 or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'')
87 87 or pycompat.ospardir in parts
88 88 ):
89 89 raise error.InputError(
90 90 _(b"path contains illegal component: %s") % path
91 91 )
92 92 # Windows shortname aliases
93 for p in parts:
94 if b"~" in p:
95 first, last = p.split(b"~", 1)
96 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
97 raise error.InputError(
98 _(b"path contains illegal component: %s") % path
99 )
93 if b"~" in path:
94 for p in parts:
95 if b"~" in p:
96 first, last = p.split(b"~", 1)
97 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
98 raise error.InputError(
99 _(b"path contains illegal component: %s") % path
100 )
100 101 if b'.hg' in _lowerclean(path):
101 102 lparts = [_lowerclean(p) for p in parts]
102 103 for p in b'.hg', b'.hg.':
103 104 if p in lparts[1:]:
104 105 pos = lparts.index(p)
105 106 base = os.path.join(*parts[:pos])
106 107 raise error.InputError(
107 108 _(b"path '%s' is inside nested repo %r")
108 109 % (path, pycompat.bytestr(base))
109 110 )
110 111
111 112 normparts = util.splitpath(normpath)
112 113 assert len(parts) == len(normparts)
113 114
114 115 parts.pop()
115 116 normparts.pop()
116 117 # It's important that we check the path parts starting from the root.
117 118 # We don't want to add "foo/bar/baz" to auditeddir before checking if
118 119 # there's a "foo/.hg" directory. This also means we won't accidentally
119 120 # traverse a symlink into some other filesystem (which is potentially
120 121 # expensive to access).
121 122 for i in range(len(parts)):
122 123 prefix = pycompat.ossep.join(parts[: i + 1])
123 124 normprefix = pycompat.ossep.join(normparts[: i + 1])
124 125 if normprefix in self.auditeddir:
125 126 continue
126 127 if self._realfs:
127 128 self._checkfs(prefix, path)
128 129 if self._cached:
129 130 self.auditeddir.add(normprefix)
130 131
131 132 if self._cached:
132 133 self.audited.add(normpath)
133 134
134 135 def _checkfs(self, prefix, path):
135 136 # type: (bytes, bytes) -> None
136 137 """raise exception if a file system backed check fails"""
137 138 curpath = os.path.join(self.root, prefix)
138 139 try:
139 140 st = os.lstat(curpath)
140 141 except OSError as err:
141 142 # EINVAL can be raised as invalid path syntax under win32.
142 143 # They must be ignored for patterns can be checked too.
143 144 if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
144 145 raise
145 146 else:
146 147 if stat.S_ISLNK(st.st_mode):
147 148 msg = _(b'path %r traverses symbolic link %r') % (
148 149 pycompat.bytestr(path),
149 150 pycompat.bytestr(prefix),
150 151 )
151 152 raise error.Abort(msg)
152 153 elif stat.S_ISDIR(st.st_mode) and os.path.isdir(
153 154 os.path.join(curpath, b'.hg')
154 155 ):
155 156 if not self.callback or not self.callback(curpath):
156 157 msg = _(b"path '%s' is inside nested repo %r")
157 158 raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
158 159
159 160 def check(self, path):
160 161 # type: (bytes) -> bool
161 162 try:
162 163 self(path)
163 164 return True
164 165 except (OSError, error.Abort):
165 166 return False
166 167
167 168 @contextlib.contextmanager
168 169 def cached(self):
169 170 if self._cached:
170 171 yield
171 172 else:
172 173 try:
173 174 self._cached = True
174 175 yield
175 176 finally:
176 177 self.audited.clear()
177 178 self.auditeddir.clear()
178 179 self._cached = False
179 180
180 181
181 182 def canonpath(root, cwd, myname, auditor=None):
182 183 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
183 184 """return the canonical path of myname, given cwd and root
184 185
185 186 >>> def check(root, cwd, myname):
186 187 ... a = pathauditor(root, realfs=False)
187 188 ... try:
188 189 ... return canonpath(root, cwd, myname, a)
189 190 ... except error.Abort:
190 191 ... return 'aborted'
191 192 >>> def unixonly(root, cwd, myname, expected='aborted'):
192 193 ... if pycompat.iswindows:
193 194 ... return expected
194 195 ... return check(root, cwd, myname)
195 196 >>> def winonly(root, cwd, myname, expected='aborted'):
196 197 ... if not pycompat.iswindows:
197 198 ... return expected
198 199 ... return check(root, cwd, myname)
199 200 >>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename')
200 201 'aborted'
201 202 >>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename')
202 203 'aborted'
203 204 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename')
204 205 'aborted'
205 206 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename',
206 207 ... b'filename')
207 208 'filename'
208 209 >>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename')
209 210 'filename'
210 211 >>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename',
211 212 ... b'subdir/filename')
212 213 'subdir/filename'
213 214 >>> unixonly(b'/repo', b'/dir', b'filename')
214 215 'aborted'
215 216 >>> unixonly(b'/repo', b'/', b'filename')
216 217 'aborted'
217 218 >>> unixonly(b'/repo', b'/', b'repo/filename', b'filename')
218 219 'filename'
219 220 >>> unixonly(b'/repo', b'/repo', b'filename', b'filename')
220 221 'filename'
221 222 >>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename')
222 223 'subdir/filename'
223 224 """
224 225 if util.endswithsep(root):
225 226 rootsep = root
226 227 else:
227 228 rootsep = root + pycompat.ossep
228 229 name = myname
229 230 if not os.path.isabs(name):
230 231 name = os.path.join(root, cwd, name)
231 232 name = os.path.normpath(name)
232 233 if auditor is None:
233 234 auditor = pathauditor(root)
234 235 if name != rootsep and name.startswith(rootsep):
235 236 name = name[len(rootsep) :]
236 237 auditor(name)
237 238 return util.pconvert(name)
238 239 elif name == root:
239 240 return b''
240 241 else:
241 242 # Determine whether `name' is in the hierarchy at or beneath `root',
242 243 # by iterating name=dirname(name) until that causes no change (can't
243 244 # check name == '/', because that doesn't work on windows). The list
244 245 # `rel' holds the reversed list of components making up the relative
245 246 # file name we want.
246 247 rel = []
247 248 while True:
248 249 try:
249 250 s = util.samefile(name, root)
250 251 except OSError:
251 252 s = False
252 253 if s:
253 254 if not rel:
254 255 # name was actually the same as root (maybe a symlink)
255 256 return b''
256 257 rel.reverse()
257 258 name = os.path.join(*rel)
258 259 auditor(name)
259 260 return util.pconvert(name)
260 261 dirname, basename = util.split(name)
261 262 rel.append(basename)
262 263 if dirname == name:
263 264 break
264 265 name = dirname
265 266
266 267 # A common mistake is to use -R, but specify a file relative to the repo
267 268 # instead of cwd. Detect that case, and provide a hint to the user.
268 269 hint = None
269 270 try:
270 271 if cwd != root:
271 272 canonpath(root, root, myname, auditor)
272 273 relpath = util.pathto(root, cwd, b'')
273 274 if relpath.endswith(pycompat.ossep):
274 275 relpath = relpath[:-1]
275 276 hint = _(b"consider using '--cwd %s'") % relpath
276 277 except error.Abort:
277 278 pass
278 279
279 280 raise error.Abort(
280 281 _(b"%s not under root '%s'") % (myname, root), hint=hint
281 282 )
282 283
283 284
284 285 def normasprefix(path):
285 286 # type: (bytes) -> bytes
286 287 """normalize the specified path as path prefix
287 288
288 289 Returned value can be used safely for "p.startswith(prefix)",
289 290 "p[len(prefix):]", and so on.
290 291
291 292 For efficiency, this expects "path" argument to be already
292 293 normalized by "os.path.normpath", "os.path.realpath", and so on.
293 294
294 295 See also issue3033 for detail about need of this function.
295 296
296 297 >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/')
297 298 '/foo/bar/'
298 299 >>> normasprefix(b'/').replace(pycompat.ossep, b'/')
299 300 '/'
300 301 """
301 302 d, p = os.path.splitdrive(path)
302 303 if len(p) != len(pycompat.ossep):
303 304 return path + pycompat.ossep
304 305 else:
305 306 return path
306 307
307 308
308 309 def finddirs(path):
309 310 # type: (bytes) -> Iterator[bytes]
310 311 pos = path.rfind(b'/')
311 312 while pos != -1:
312 313 yield path[:pos]
313 314 pos = path.rfind(b'/', 0, pos)
314 315 yield b''
315 316
316 317
317 318 class dirs:
318 319 '''a multiset of directory names from a set of file paths'''
319 320
320 321 def __init__(self, map, only_tracked=False):
321 322 """
322 323 a dict map indicates a dirstate while a list indicates a manifest
323 324 """
324 325 self._dirs = {}
325 326 addpath = self.addpath
326 327 if isinstance(map, dict) and only_tracked:
327 328 for f, s in map.items():
328 329 if s.state != b'r':
329 330 addpath(f)
330 331 elif only_tracked:
331 332 msg = b"`only_tracked` is only supported with a dict source"
332 333 raise error.ProgrammingError(msg)
333 334 else:
334 335 for f in map:
335 336 addpath(f)
336 337
337 338 def addpath(self, path):
338 339 # type: (bytes) -> None
339 340 dirs = self._dirs
340 341 for base in finddirs(path):
341 342 if base.endswith(b'/'):
342 343 raise ValueError(
343 344 "found invalid consecutive slashes in path: %r" % base
344 345 )
345 346 if base in dirs:
346 347 dirs[base] += 1
347 348 return
348 349 dirs[base] = 1
349 350
350 351 def delpath(self, path):
351 352 # type: (bytes) -> None
352 353 dirs = self._dirs
353 354 for base in finddirs(path):
354 355 if dirs[base] > 1:
355 356 dirs[base] -= 1
356 357 return
357 358 del dirs[base]
358 359
359 360 def __iter__(self):
360 361 return iter(self._dirs)
361 362
362 363 def __contains__(self, d):
363 364 # type: (bytes) -> bool
364 365 return d in self._dirs
365 366
366 367
367 368 if util.safehasattr(parsers, 'dirs'):
368 369 dirs = parsers.dirs
369 370
370 371 if rustdirs is not None:
371 372 dirs = rustdirs
372 373
373 374
374 375 # forward two methods from posixpath that do what we need, but we'd
375 376 # rather not let our internals know that we're thinking in posix terms
376 377 # - instead we'll let them be oblivious.
377 378 join = posixpath.join
378 379 dirname = posixpath.dirname # type: Callable[[bytes], bytes]
General Comments 0
You need to be logged in to leave comments. Login now