##// END OF EJS Templates
typing: use python3-style type annotation
Arseniy Alekseyev -
r50785:5f664401 default
parent child Browse files
Show More
@@ -1,381 +1,380 b''
1 1 import contextlib
2 2 import errno
3 3 import os
4 4 import posixpath
5 5 import stat
6 6
7 7 from .i18n import _
8 8 from . import (
9 9 encoding,
10 10 error,
11 11 policy,
12 12 pycompat,
13 13 util,
14 14 )
15 15
16 16 if pycompat.TYPE_CHECKING:
17 17 from typing import (
18 18 Any,
19 19 Callable,
20 20 Iterator,
21 21 Optional,
22 22 )
23 23
24 24
25 25 rustdirs = policy.importrust('dirstate', 'Dirs')
26 26 parsers = policy.importmod('parsers')
27 27
28 28
29 29 def _lowerclean(s):
30 30 # type: (bytes) -> bytes
31 31 return encoding.hfsignoreclean(s.lower())
32 32
33 33
34 34 class pathauditor:
35 35 """ensure that a filesystem path contains no banned components.
36 36 the following properties of a path are checked:
37 37
38 38 - ends with a directory separator
39 39 - under top-level .hg
40 40 - starts at the root of a windows drive
41 41 - contains ".."
42 42
43 43 More check are also done about the file system states:
44 44 - traverses a symlink (e.g. a/symlink_here/b)
45 45 - inside a nested repository (a callback can be used to approve
46 46 some nested repositories, e.g., subrepositories)
47 47
48 48 The file system checks are only done when 'realfs' is set to True (the
49 49 default). They should be disable then we are auditing path for operation on
50 50 stored history.
51 51
52 52 If 'cached' is set to True, audited paths and sub-directories are cached.
53 53 Be careful to not keep the cache of unmanaged directories for long because
54 54 audited paths may be replaced with symlinks.
55 55 """
56 56
57 57 def __init__(self, root, callback=None, realfs=True, cached=False):
58 58 self.audited = set()
59 59 self.auditeddir = dict()
60 60 self.root = root
61 61 self._realfs = realfs
62 62 self._cached = cached
63 63 self.callback = callback
64 64 if os.path.lexists(root) and not util.fscasesensitive(root):
65 65 self.normcase = util.normcase
66 66 else:
67 67 self.normcase = lambda x: x
68 68
69 69 def __call__(self, path, mode=None):
70 70 # type: (bytes, Optional[Any]) -> None
71 71 """Check the relative path.
72 72 path may contain a pattern (e.g. foodir/**.txt)"""
73 73
74 74 path = util.localpath(path)
75 75 if path in self.audited:
76 76 return
77 77 # AIX ignores "/" at end of path, others raise EISDIR.
78 78 if util.endswithsep(path):
79 79 raise error.InputError(
80 80 _(b"path ends in directory separator: %s") % path
81 81 )
82 82 parts = util.splitpath(path)
83 83 if (
84 84 os.path.splitdrive(path)[0]
85 85 or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'')
86 86 or pycompat.ospardir in parts
87 87 ):
88 88 raise error.InputError(
89 89 _(b"path contains illegal component: %s") % path
90 90 )
91 91 # Windows shortname aliases
92 92 if b"~" in path:
93 93 for p in parts:
94 94 if b"~" in p:
95 95 first, last = p.split(b"~", 1)
96 96 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
97 97 raise error.InputError(
98 98 _(b"path contains illegal component: %s") % path
99 99 )
100 100 if b'.hg' in _lowerclean(path):
101 101 lparts = [_lowerclean(p) for p in parts]
102 102 for p in b'.hg', b'.hg.':
103 103 if p in lparts[1:]:
104 104 pos = lparts.index(p)
105 105 base = os.path.join(*parts[:pos])
106 106 raise error.InputError(
107 107 _(b"path '%s' is inside nested repo %r")
108 108 % (path, pycompat.bytestr(base))
109 109 )
110 110
111 111 if self._realfs:
112 112 parts.pop()
113 113 # It's important that we check the path parts starting from the root.
114 114 # We don't want to add "foo/bar/baz" to auditeddir before checking if
115 115 # there's a "foo/.hg" directory. This also means we won't accidentally
116 116 # traverse a symlink into some other filesystem (which is potentially
117 117 # expensive to access).
118 118 for i in range(len(parts)):
119 119 prefix = pycompat.ossep.join(parts[: i + 1])
120 120 if prefix in self.auditeddir:
121 121 res = self.auditeddir[prefix]
122 122 else:
123 123 res = self._checkfs_exists(prefix, path)
124 124 if self._cached:
125 125 self.auditeddir[prefix] = res
126 126 if not res:
127 127 break
128 128
129 129 if self._cached:
130 130 self.audited.add(path)
131 131
132 def _checkfs_exists(self, prefix, path):
133 # type: (bytes, bytes) -> bool
132 def _checkfs_exists(self, prefix: bytes, path: bytes) -> bool:
134 133 """raise exception if a file system backed check fails.
135 134
136 135 Return a bool that indicates that the directory (or file) exists."""
137 136 curpath = os.path.join(self.root, prefix)
138 137 try:
139 138 st = os.lstat(curpath)
140 139 except OSError as err:
141 140 if err.errno == errno.ENOENT:
142 141 return False
143 142 # EINVAL can be raised as invalid path syntax under win32.
144 143 # They must be ignored for patterns can be checked too.
145 144 if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
146 145 raise
147 146 else:
148 147 if stat.S_ISLNK(st.st_mode):
149 148 msg = _(b'path %r traverses symbolic link %r') % (
150 149 pycompat.bytestr(path),
151 150 pycompat.bytestr(prefix),
152 151 )
153 152 raise error.Abort(msg)
154 153 elif stat.S_ISDIR(st.st_mode) and os.path.isdir(
155 154 os.path.join(curpath, b'.hg')
156 155 ):
157 156 if not self.callback or not self.callback(curpath):
158 157 msg = _(b"path '%s' is inside nested repo %r")
159 158 raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
160 159 return True
161 160
162 161 def check(self, path):
163 162 # type: (bytes) -> bool
164 163 try:
165 164 self(path)
166 165 return True
167 166 except (OSError, error.Abort):
168 167 return False
169 168
170 169 @contextlib.contextmanager
171 170 def cached(self):
172 171 if self._cached:
173 172 yield
174 173 else:
175 174 try:
176 175 self._cached = True
177 176 yield
178 177 finally:
179 178 self.audited.clear()
180 179 self.auditeddir.clear()
181 180 self._cached = False
182 181
183 182
184 183 def canonpath(root, cwd, myname, auditor=None):
185 184 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
186 185 """return the canonical path of myname, given cwd and root
187 186
188 187 >>> def check(root, cwd, myname):
189 188 ... a = pathauditor(root, realfs=False)
190 189 ... try:
191 190 ... return canonpath(root, cwd, myname, a)
192 191 ... except error.Abort:
193 192 ... return 'aborted'
194 193 >>> def unixonly(root, cwd, myname, expected='aborted'):
195 194 ... if pycompat.iswindows:
196 195 ... return expected
197 196 ... return check(root, cwd, myname)
198 197 >>> def winonly(root, cwd, myname, expected='aborted'):
199 198 ... if not pycompat.iswindows:
200 199 ... return expected
201 200 ... return check(root, cwd, myname)
202 201 >>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename')
203 202 'aborted'
204 203 >>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename')
205 204 'aborted'
206 205 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename')
207 206 'aborted'
208 207 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename',
209 208 ... b'filename')
210 209 'filename'
211 210 >>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename')
212 211 'filename'
213 212 >>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename',
214 213 ... b'subdir/filename')
215 214 'subdir/filename'
216 215 >>> unixonly(b'/repo', b'/dir', b'filename')
217 216 'aborted'
218 217 >>> unixonly(b'/repo', b'/', b'filename')
219 218 'aborted'
220 219 >>> unixonly(b'/repo', b'/', b'repo/filename', b'filename')
221 220 'filename'
222 221 >>> unixonly(b'/repo', b'/repo', b'filename', b'filename')
223 222 'filename'
224 223 >>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename')
225 224 'subdir/filename'
226 225 """
227 226 if util.endswithsep(root):
228 227 rootsep = root
229 228 else:
230 229 rootsep = root + pycompat.ossep
231 230 name = myname
232 231 if not os.path.isabs(name):
233 232 name = os.path.join(root, cwd, name)
234 233 name = os.path.normpath(name)
235 234 if auditor is None:
236 235 auditor = pathauditor(root)
237 236 if name != rootsep and name.startswith(rootsep):
238 237 name = name[len(rootsep) :]
239 238 auditor(name)
240 239 return util.pconvert(name)
241 240 elif name == root:
242 241 return b''
243 242 else:
244 243 # Determine whether `name' is in the hierarchy at or beneath `root',
245 244 # by iterating name=dirname(name) until that causes no change (can't
246 245 # check name == '/', because that doesn't work on windows). The list
247 246 # `rel' holds the reversed list of components making up the relative
248 247 # file name we want.
249 248 rel = []
250 249 while True:
251 250 try:
252 251 s = util.samefile(name, root)
253 252 except OSError:
254 253 s = False
255 254 if s:
256 255 if not rel:
257 256 # name was actually the same as root (maybe a symlink)
258 257 return b''
259 258 rel.reverse()
260 259 name = os.path.join(*rel)
261 260 auditor(name)
262 261 return util.pconvert(name)
263 262 dirname, basename = util.split(name)
264 263 rel.append(basename)
265 264 if dirname == name:
266 265 break
267 266 name = dirname
268 267
269 268 # A common mistake is to use -R, but specify a file relative to the repo
270 269 # instead of cwd. Detect that case, and provide a hint to the user.
271 270 hint = None
272 271 try:
273 272 if cwd != root:
274 273 canonpath(root, root, myname, auditor)
275 274 relpath = util.pathto(root, cwd, b'')
276 275 if relpath.endswith(pycompat.ossep):
277 276 relpath = relpath[:-1]
278 277 hint = _(b"consider using '--cwd %s'") % relpath
279 278 except error.Abort:
280 279 pass
281 280
282 281 raise error.Abort(
283 282 _(b"%s not under root '%s'") % (myname, root), hint=hint
284 283 )
285 284
286 285
287 286 def normasprefix(path):
288 287 # type: (bytes) -> bytes
289 288 """normalize the specified path as path prefix
290 289
291 290 Returned value can be used safely for "p.startswith(prefix)",
292 291 "p[len(prefix):]", and so on.
293 292
294 293 For efficiency, this expects "path" argument to be already
295 294 normalized by "os.path.normpath", "os.path.realpath", and so on.
296 295
297 296 See also issue3033 for detail about need of this function.
298 297
299 298 >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/')
300 299 '/foo/bar/'
301 300 >>> normasprefix(b'/').replace(pycompat.ossep, b'/')
302 301 '/'
303 302 """
304 303 d, p = os.path.splitdrive(path)
305 304 if len(p) != len(pycompat.ossep):
306 305 return path + pycompat.ossep
307 306 else:
308 307 return path
309 308
310 309
311 310 def finddirs(path):
312 311 # type: (bytes) -> Iterator[bytes]
313 312 pos = path.rfind(b'/')
314 313 while pos != -1:
315 314 yield path[:pos]
316 315 pos = path.rfind(b'/', 0, pos)
317 316 yield b''
318 317
319 318
320 319 class dirs:
321 320 '''a multiset of directory names from a set of file paths'''
322 321
323 322 def __init__(self, map, only_tracked=False):
324 323 """
325 324 a dict map indicates a dirstate while a list indicates a manifest
326 325 """
327 326 self._dirs = {}
328 327 addpath = self.addpath
329 328 if isinstance(map, dict) and only_tracked:
330 329 for f, s in map.items():
331 330 if s.state != b'r':
332 331 addpath(f)
333 332 elif only_tracked:
334 333 msg = b"`only_tracked` is only supported with a dict source"
335 334 raise error.ProgrammingError(msg)
336 335 else:
337 336 for f in map:
338 337 addpath(f)
339 338
340 339 def addpath(self, path):
341 340 # type: (bytes) -> None
342 341 dirs = self._dirs
343 342 for base in finddirs(path):
344 343 if base.endswith(b'/'):
345 344 raise ValueError(
346 345 "found invalid consecutive slashes in path: %r" % base
347 346 )
348 347 if base in dirs:
349 348 dirs[base] += 1
350 349 return
351 350 dirs[base] = 1
352 351
353 352 def delpath(self, path):
354 353 # type: (bytes) -> None
355 354 dirs = self._dirs
356 355 for base in finddirs(path):
357 356 if dirs[base] > 1:
358 357 dirs[base] -= 1
359 358 return
360 359 del dirs[base]
361 360
362 361 def __iter__(self):
363 362 return iter(self._dirs)
364 363
365 364 def __contains__(self, d):
366 365 # type: (bytes) -> bool
367 366 return d in self._dirs
368 367
369 368
370 369 if util.safehasattr(parsers, 'dirs'):
371 370 dirs = parsers.dirs
372 371
373 372 if rustdirs is not None:
374 373 dirs = rustdirs
375 374
376 375
377 376 # forward two methods from posixpath that do what we need, but we'd
378 377 # rather not let our internals know that we're thinking in posix terms
379 378 # - instead we'll let them be oblivious.
380 379 join = posixpath.join
381 380 dirname = posixpath.dirname # type: Callable[[bytes], bytes]
General Comments 0
You need to be logged in to leave comments. Login now