##// END OF EJS Templates
typing: import unconditionally...
Arseniy Alekseyev -
r50801:15175774 default
parent child Browse files
Show More
@@ -1,380 +1,378 b''
1 1 import contextlib
2 2 import errno
3 3 import os
4 4 import posixpath
5 5 import stat
6 6
7 from typing import (
8 Any,
9 Callable,
10 Iterator,
11 Optional,
12 )
13
7 14 from .i18n import _
8 15 from . import (
9 16 encoding,
10 17 error,
11 18 policy,
12 19 pycompat,
13 20 util,
14 21 )
15 22
16 if pycompat.TYPE_CHECKING:
17 from typing import (
18 Any,
19 Callable,
20 Iterator,
21 Optional,
22 )
23
24
25 23 rustdirs = policy.importrust('dirstate', 'Dirs')
26 24 parsers = policy.importmod('parsers')
27 25
28 26
29 27 def _lowerclean(s):
30 28 # type: (bytes) -> bytes
31 29 return encoding.hfsignoreclean(s.lower())
32 30
33 31
34 32 class pathauditor:
35 33 """ensure that a filesystem path contains no banned components.
36 34 the following properties of a path are checked:
37 35
38 36 - ends with a directory separator
39 37 - under top-level .hg
40 38 - starts at the root of a windows drive
41 39 - contains ".."
42 40
43 41 More check are also done about the file system states:
44 42 - traverses a symlink (e.g. a/symlink_here/b)
45 43 - inside a nested repository (a callback can be used to approve
46 44 some nested repositories, e.g., subrepositories)
47 45
48 46 The file system checks are only done when 'realfs' is set to True (the
49 47 default). They should be disable then we are auditing path for operation on
50 48 stored history.
51 49
52 50 If 'cached' is set to True, audited paths and sub-directories are cached.
53 51 Be careful to not keep the cache of unmanaged directories for long because
54 52 audited paths may be replaced with symlinks.
55 53 """
56 54
57 55 def __init__(self, root, callback=None, realfs=True, cached=False):
58 56 self.audited = set()
59 57 self.auditeddir = dict()
60 58 self.root = root
61 59 self._realfs = realfs
62 60 self._cached = cached
63 61 self.callback = callback
64 62 if os.path.lexists(root) and not util.fscasesensitive(root):
65 63 self.normcase = util.normcase
66 64 else:
67 65 self.normcase = lambda x: x
68 66
69 67 def __call__(self, path, mode=None):
70 68 # type: (bytes, Optional[Any]) -> None
71 69 """Check the relative path.
72 70 path may contain a pattern (e.g. foodir/**.txt)"""
73 71
74 72 path = util.localpath(path)
75 73 if path in self.audited:
76 74 return
77 75 # AIX ignores "/" at end of path, others raise EISDIR.
78 76 if util.endswithsep(path):
79 77 raise error.InputError(
80 78 _(b"path ends in directory separator: %s") % path
81 79 )
82 80 parts = util.splitpath(path)
83 81 if (
84 82 os.path.splitdrive(path)[0]
85 83 or _lowerclean(parts[0]) in (b'.hg', b'.hg.', b'')
86 84 or pycompat.ospardir in parts
87 85 ):
88 86 raise error.InputError(
89 87 _(b"path contains illegal component: %s") % path
90 88 )
91 89 # Windows shortname aliases
92 90 if b"~" in path:
93 91 for p in parts:
94 92 if b"~" in p:
95 93 first, last = p.split(b"~", 1)
96 94 if last.isdigit() and first.upper() in [b"HG", b"HG8B6C"]:
97 95 raise error.InputError(
98 96 _(b"path contains illegal component: %s") % path
99 97 )
100 98 if b'.hg' in _lowerclean(path):
101 99 lparts = [_lowerclean(p) for p in parts]
102 100 for p in b'.hg', b'.hg.':
103 101 if p in lparts[1:]:
104 102 pos = lparts.index(p)
105 103 base = os.path.join(*parts[:pos])
106 104 raise error.InputError(
107 105 _(b"path '%s' is inside nested repo %r")
108 106 % (path, pycompat.bytestr(base))
109 107 )
110 108
111 109 if self._realfs:
112 110 parts.pop()
113 111 # It's important that we check the path parts starting from the root.
114 112 # We don't want to add "foo/bar/baz" to auditeddir before checking if
115 113 # there's a "foo/.hg" directory. This also means we won't accidentally
116 114 # traverse a symlink into some other filesystem (which is potentially
117 115 # expensive to access).
118 116 for i in range(len(parts)):
119 117 prefix = pycompat.ossep.join(parts[: i + 1])
120 118 if prefix in self.auditeddir:
121 119 res = self.auditeddir[prefix]
122 120 else:
123 121 res = self._checkfs_exists(prefix, path)
124 122 if self._cached:
125 123 self.auditeddir[prefix] = res
126 124 if not res:
127 125 break
128 126
129 127 if self._cached:
130 128 self.audited.add(path)
131 129
132 130 def _checkfs_exists(self, prefix: bytes, path: bytes) -> bool:
133 131 """raise exception if a file system backed check fails.
134 132
135 133 Return a bool that indicates that the directory (or file) exists."""
136 134 curpath = os.path.join(self.root, prefix)
137 135 try:
138 136 st = os.lstat(curpath)
139 137 except OSError as err:
140 138 if err.errno == errno.ENOENT:
141 139 return False
142 140 # EINVAL can be raised as invalid path syntax under win32.
143 141 # They must be ignored for patterns can be checked too.
144 142 if err.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EINVAL):
145 143 raise
146 144 else:
147 145 if stat.S_ISLNK(st.st_mode):
148 146 msg = _(b'path %r traverses symbolic link %r') % (
149 147 pycompat.bytestr(path),
150 148 pycompat.bytestr(prefix),
151 149 )
152 150 raise error.Abort(msg)
153 151 elif stat.S_ISDIR(st.st_mode) and os.path.isdir(
154 152 os.path.join(curpath, b'.hg')
155 153 ):
156 154 if not self.callback or not self.callback(curpath):
157 155 msg = _(b"path '%s' is inside nested repo %r")
158 156 raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
159 157 return True
160 158
161 159 def check(self, path):
162 160 # type: (bytes) -> bool
163 161 try:
164 162 self(path)
165 163 return True
166 164 except (OSError, error.Abort):
167 165 return False
168 166
169 167 @contextlib.contextmanager
170 168 def cached(self):
171 169 if self._cached:
172 170 yield
173 171 else:
174 172 try:
175 173 self._cached = True
176 174 yield
177 175 finally:
178 176 self.audited.clear()
179 177 self.auditeddir.clear()
180 178 self._cached = False
181 179
182 180
183 181 def canonpath(root, cwd, myname, auditor=None):
184 182 # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
185 183 """return the canonical path of myname, given cwd and root
186 184
187 185 >>> def check(root, cwd, myname):
188 186 ... a = pathauditor(root, realfs=False)
189 187 ... try:
190 188 ... return canonpath(root, cwd, myname, a)
191 189 ... except error.Abort:
192 190 ... return 'aborted'
193 191 >>> def unixonly(root, cwd, myname, expected='aborted'):
194 192 ... if pycompat.iswindows:
195 193 ... return expected
196 194 ... return check(root, cwd, myname)
197 195 >>> def winonly(root, cwd, myname, expected='aborted'):
198 196 ... if not pycompat.iswindows:
199 197 ... return expected
200 198 ... return check(root, cwd, myname)
201 199 >>> winonly(b'd:\\\\repo', b'c:\\\\dir', b'filename')
202 200 'aborted'
203 201 >>> winonly(b'c:\\\\repo', b'c:\\\\dir', b'filename')
204 202 'aborted'
205 203 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'filename')
206 204 'aborted'
207 205 >>> winonly(b'c:\\\\repo', b'c:\\\\', b'repo\\\\filename',
208 206 ... b'filename')
209 207 'filename'
210 208 >>> winonly(b'c:\\\\repo', b'c:\\\\repo', b'filename', b'filename')
211 209 'filename'
212 210 >>> winonly(b'c:\\\\repo', b'c:\\\\repo\\\\subdir', b'filename',
213 211 ... b'subdir/filename')
214 212 'subdir/filename'
215 213 >>> unixonly(b'/repo', b'/dir', b'filename')
216 214 'aborted'
217 215 >>> unixonly(b'/repo', b'/', b'filename')
218 216 'aborted'
219 217 >>> unixonly(b'/repo', b'/', b'repo/filename', b'filename')
220 218 'filename'
221 219 >>> unixonly(b'/repo', b'/repo', b'filename', b'filename')
222 220 'filename'
223 221 >>> unixonly(b'/repo', b'/repo/subdir', b'filename', b'subdir/filename')
224 222 'subdir/filename'
225 223 """
226 224 if util.endswithsep(root):
227 225 rootsep = root
228 226 else:
229 227 rootsep = root + pycompat.ossep
230 228 name = myname
231 229 if not os.path.isabs(name):
232 230 name = os.path.join(root, cwd, name)
233 231 name = os.path.normpath(name)
234 232 if auditor is None:
235 233 auditor = pathauditor(root)
236 234 if name != rootsep and name.startswith(rootsep):
237 235 name = name[len(rootsep) :]
238 236 auditor(name)
239 237 return util.pconvert(name)
240 238 elif name == root:
241 239 return b''
242 240 else:
243 241 # Determine whether `name' is in the hierarchy at or beneath `root',
244 242 # by iterating name=dirname(name) until that causes no change (can't
245 243 # check name == '/', because that doesn't work on windows). The list
246 244 # `rel' holds the reversed list of components making up the relative
247 245 # file name we want.
248 246 rel = []
249 247 while True:
250 248 try:
251 249 s = util.samefile(name, root)
252 250 except OSError:
253 251 s = False
254 252 if s:
255 253 if not rel:
256 254 # name was actually the same as root (maybe a symlink)
257 255 return b''
258 256 rel.reverse()
259 257 name = os.path.join(*rel)
260 258 auditor(name)
261 259 return util.pconvert(name)
262 260 dirname, basename = util.split(name)
263 261 rel.append(basename)
264 262 if dirname == name:
265 263 break
266 264 name = dirname
267 265
268 266 # A common mistake is to use -R, but specify a file relative to the repo
269 267 # instead of cwd. Detect that case, and provide a hint to the user.
270 268 hint = None
271 269 try:
272 270 if cwd != root:
273 271 canonpath(root, root, myname, auditor)
274 272 relpath = util.pathto(root, cwd, b'')
275 273 if relpath.endswith(pycompat.ossep):
276 274 relpath = relpath[:-1]
277 275 hint = _(b"consider using '--cwd %s'") % relpath
278 276 except error.Abort:
279 277 pass
280 278
281 279 raise error.Abort(
282 280 _(b"%s not under root '%s'") % (myname, root), hint=hint
283 281 )
284 282
285 283
286 284 def normasprefix(path):
287 285 # type: (bytes) -> bytes
288 286 """normalize the specified path as path prefix
289 287
290 288 Returned value can be used safely for "p.startswith(prefix)",
291 289 "p[len(prefix):]", and so on.
292 290
293 291 For efficiency, this expects "path" argument to be already
294 292 normalized by "os.path.normpath", "os.path.realpath", and so on.
295 293
296 294 See also issue3033 for detail about need of this function.
297 295
298 296 >>> normasprefix(b'/foo/bar').replace(pycompat.ossep, b'/')
299 297 '/foo/bar/'
300 298 >>> normasprefix(b'/').replace(pycompat.ossep, b'/')
301 299 '/'
302 300 """
303 301 d, p = os.path.splitdrive(path)
304 302 if len(p) != len(pycompat.ossep):
305 303 return path + pycompat.ossep
306 304 else:
307 305 return path
308 306
309 307
310 308 def finddirs(path):
311 309 # type: (bytes) -> Iterator[bytes]
312 310 pos = path.rfind(b'/')
313 311 while pos != -1:
314 312 yield path[:pos]
315 313 pos = path.rfind(b'/', 0, pos)
316 314 yield b''
317 315
318 316
319 317 class dirs:
320 318 '''a multiset of directory names from a set of file paths'''
321 319
322 320 def __init__(self, map, only_tracked=False):
323 321 """
324 322 a dict map indicates a dirstate while a list indicates a manifest
325 323 """
326 324 self._dirs = {}
327 325 addpath = self.addpath
328 326 if isinstance(map, dict) and only_tracked:
329 327 for f, s in map.items():
330 328 if s.state != b'r':
331 329 addpath(f)
332 330 elif only_tracked:
333 331 msg = b"`only_tracked` is only supported with a dict source"
334 332 raise error.ProgrammingError(msg)
335 333 else:
336 334 for f in map:
337 335 addpath(f)
338 336
339 337 def addpath(self, path):
340 338 # type: (bytes) -> None
341 339 dirs = self._dirs
342 340 for base in finddirs(path):
343 341 if base.endswith(b'/'):
344 342 raise ValueError(
345 343 "found invalid consecutive slashes in path: %r" % base
346 344 )
347 345 if base in dirs:
348 346 dirs[base] += 1
349 347 return
350 348 dirs[base] = 1
351 349
352 350 def delpath(self, path):
353 351 # type: (bytes) -> None
354 352 dirs = self._dirs
355 353 for base in finddirs(path):
356 354 if dirs[base] > 1:
357 355 dirs[base] -= 1
358 356 return
359 357 del dirs[base]
360 358
361 359 def __iter__(self):
362 360 return iter(self._dirs)
363 361
364 362 def __contains__(self, d):
365 363 # type: (bytes) -> bool
366 364 return d in self._dirs
367 365
368 366
369 367 if util.safehasattr(parsers, 'dirs'):
370 368 dirs = parsers.dirs
371 369
372 370 if rustdirs is not None:
373 371 dirs = rustdirs
374 372
375 373
376 374 # forward two methods from posixpath that do what we need, but we'd
377 375 # rather not let our internals know that we're thinking in posix terms
378 376 # - instead we'll let them be oblivious.
379 377 join = posixpath.join
380 378 dirname = posixpath.dirname # type: Callable[[bytes], bytes]
General Comments 0
You need to be logged in to leave comments. Login now