##// END OF EJS Templates
typing: add type hints to the common posix/windows platform functions...
Matt Harbison -
r50707:58dff81f default
parent child Browse files
Show More
@@ -1,759 +1,774 b''
1 1 # posix.py - Posix utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import fcntl
11 11 import getpass
12 12 import grp
13 13 import os
14 14 import pwd
15 15 import re
16 16 import select
17 17 import stat
18 18 import sys
19 19 import tempfile
20 20 import unicodedata
21 21
22 22 from typing import (
23 Iterable,
24 Iterator,
23 25 List,
24 26 NoReturn,
25 27 Optional,
28 Sequence,
29 Union,
26 30 )
27 31
28 32 from .i18n import _
29 33 from .pycompat import (
30 34 getattr,
31 35 open,
32 36 )
33 37 from . import (
34 38 encoding,
35 39 error,
36 40 policy,
37 41 pycompat,
38 42 )
39 43
40 44 osutil = policy.importmod('osutil')
41 45
42 46 normpath = os.path.normpath
43 47 samestat = os.path.samestat
44 48 abspath = os.path.abspath # re-exports
45 49
46 50 try:
47 51 oslink = os.link
48 52 except AttributeError:
49 53 # Some platforms build Python without os.link on systems that are
50 54 # vaguely unix-like but don't have hardlink support. For those
51 55 # poor souls, just say we tried and that it failed so we fall back
52 56 # to copies.
53 57 def oslink(src: bytes, dst: bytes) -> NoReturn:
54 58 raise OSError(
55 59 errno.EINVAL, b'hardlinks not supported: %s to %s' % (src, dst)
56 60 )
57 61
58 62
59 63 readlink = os.readlink
60 64 unlink = os.unlink
61 65 rename = os.rename
62 66 removedirs = os.removedirs
63 67 expandglobs = False
64 68
65 69 umask = os.umask(0)
66 70 os.umask(umask)
67 71
68 72 posixfile = open
69 73
70 74
71 75 def split(p):
72 76 """Same as posixpath.split, but faster
73 77
74 78 >>> import posixpath
75 79 >>> for f in [b'/absolute/path/to/file',
76 80 ... b'relative/path/to/file',
77 81 ... b'file_alone',
78 82 ... b'path/to/directory/',
79 83 ... b'/multiple/path//separators',
80 84 ... b'/file_at_root',
81 85 ... b'///multiple_leading_separators_at_root',
82 86 ... b'']:
83 87 ... assert split(f) == posixpath.split(f), f
84 88 """
85 89 ht = p.rsplit(b'/', 1)
86 90 if len(ht) == 1:
87 91 return b'', p
88 92 nh = ht[0].rstrip(b'/')
89 93 if nh:
90 94 return nh, ht[1]
91 95 return ht[0] + b'/', ht[1]
92 96
93 97
94 def openhardlinks():
98 def openhardlinks() -> bool:
95 99 '''return true if it is safe to hold open file handles to hardlinks'''
96 100 return True
97 101
98 102
99 103 def nlinks(name: bytes) -> int:
100 104 '''return number of hardlinks for the given file'''
101 105 return os.lstat(name).st_nlink
102 106
103 107
104 def parsepatchoutput(output_line):
108 def parsepatchoutput(output_line: bytes) -> bytes:
105 109 """parses the output produced by patch and returns the filename"""
106 110 pf = output_line[14:]
107 111 if pycompat.sysplatform == b'OpenVMS':
108 112 if pf[0] == b'`':
109 113 pf = pf[1:-1] # Remove the quotes
110 114 else:
111 115 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf:
112 116 pf = pf[1:-1] # Remove the quotes
113 117 return pf
114 118
115 119
116 def sshargs(sshcmd, host, user, port):
120 def sshargs(
121 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
122 ) -> bytes:
117 123 '''Build argument list for ssh'''
118 124 args = user and (b"%s@%s" % (user, host)) or host
119 125 if b'-' in args[:1]:
120 126 raise error.Abort(
121 127 _(b'illegal ssh hostname or username starting with -: %s') % args
122 128 )
123 129 args = shellquote(args)
124 130 if port:
125 131 args = b'-p %s %s' % (shellquote(port), args)
126 132 return args
127 133
128 134
129 def isexec(f):
135 def isexec(f: bytes) -> bool:
130 136 """check whether a file is executable"""
131 137 return os.lstat(f).st_mode & 0o100 != 0
132 138
133 139
134 def setflags(f, l, x):
140 def setflags(f: bytes, l: bool, x: bool) -> None:
135 141 st = os.lstat(f)
136 142 s = st.st_mode
137 143 if l:
138 144 if not stat.S_ISLNK(s):
139 145 # switch file to link
140 146 with open(f, b'rb') as fp:
141 147 data = fp.read()
142 148 unlink(f)
143 149 try:
144 150 os.symlink(data, f)
145 151 except OSError:
146 152 # failed to make a link, rewrite file
147 153 with open(f, b"wb") as fp:
148 154 fp.write(data)
149 155
150 156 # no chmod needed at this point
151 157 return
152 158 if stat.S_ISLNK(s):
153 159 # switch link to file
154 160 data = os.readlink(f)
155 161 unlink(f)
156 162 with open(f, b"wb") as fp:
157 163 fp.write(data)
158 164 s = 0o666 & ~umask # avoid restatting for chmod
159 165
160 166 sx = s & 0o100
161 167 if st.st_nlink > 1 and bool(x) != bool(sx):
162 168 # the file is a hardlink, break it
163 169 with open(f, b"rb") as fp:
164 170 data = fp.read()
165 171 unlink(f)
166 172 with open(f, b"wb") as fp:
167 173 fp.write(data)
168 174
169 175 if x and not sx:
170 176 # Turn on +x for every +r bit when making a file executable
171 177 # and obey umask.
172 178 os.chmod(f, s | (s & 0o444) >> 2 & ~umask)
173 179 elif not x and sx:
174 180 # Turn off all +x bits
175 181 os.chmod(f, s & 0o666)
176 182
177 183
178 def copymode(src, dst, mode=None, enforcewritable=False):
184 def copymode(
185 src: bytes,
186 dst: bytes,
187 mode: Optional[bytes] = None,
188 enforcewritable: bool = False,
189 ) -> None:
179 190 """Copy the file mode from the file at path src to dst.
180 191 If src doesn't exist, we're using mode instead. If mode is None, we're
181 192 using umask."""
182 193 try:
183 194 st_mode = os.lstat(src).st_mode & 0o777
184 195 except FileNotFoundError:
185 196 st_mode = mode
186 197 if st_mode is None:
187 198 st_mode = ~umask
188 199 st_mode &= 0o666
189 200
190 201 new_mode = st_mode
191 202
192 203 if enforcewritable:
193 204 new_mode |= stat.S_IWUSR
194 205
195 206 os.chmod(dst, new_mode)
196 207
197 208
198 def checkexec(path):
209 def checkexec(path: bytes) -> bool:
199 210 """
200 211 Check whether the given path is on a filesystem with UNIX-like exec flags
201 212
202 213 Requires a directory (like /foo/.hg)
203 214 """
204 215
205 216 # VFAT on some Linux versions can flip mode but it doesn't persist
206 217 # a FS remount. Frequently we can detect it if files are created
207 218 # with exec bit on.
208 219
209 220 try:
210 221 EXECFLAGS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
211 222 basedir = os.path.join(path, b'.hg')
212 223 cachedir = os.path.join(basedir, b'wcache')
213 224 storedir = os.path.join(basedir, b'store')
214 225 if not os.path.exists(cachedir):
215 226 try:
216 227 # we want to create the 'cache' directory, not the '.hg' one.
217 228 # Automatically creating '.hg' directory could silently spawn
218 229 # invalid Mercurial repositories. That seems like a bad idea.
219 230 os.mkdir(cachedir)
220 231 if os.path.exists(storedir):
221 232 copymode(storedir, cachedir)
222 233 else:
223 234 copymode(basedir, cachedir)
224 235 except (IOError, OSError):
225 236 # we other fallback logic triggers
226 237 pass
227 238 if os.path.isdir(cachedir):
228 239 checkisexec = os.path.join(cachedir, b'checkisexec')
229 240 checknoexec = os.path.join(cachedir, b'checknoexec')
230 241
231 242 try:
232 243 m = os.stat(checkisexec).st_mode
233 244 except FileNotFoundError:
234 245 # checkisexec does not exist - fall through ...
235 246 pass
236 247 else:
237 248 # checkisexec exists, check if it actually is exec
238 249 if m & EXECFLAGS != 0:
239 250 # ensure checkisexec exists, check it isn't exec
240 251 try:
241 252 m = os.stat(checknoexec).st_mode
242 253 except FileNotFoundError:
243 254 open(checknoexec, b'w').close() # might fail
244 255 m = os.stat(checknoexec).st_mode
245 256 if m & EXECFLAGS == 0:
246 257 # check-exec is exec and check-no-exec is not exec
247 258 return True
248 259 # checknoexec exists but is exec - delete it
249 260 unlink(checknoexec)
250 261 # checkisexec exists but is not exec - delete it
251 262 unlink(checkisexec)
252 263
253 264 # check using one file, leave it as checkisexec
254 265 checkdir = cachedir
255 266 else:
256 267 # check directly in path and don't leave checkisexec behind
257 268 checkdir = path
258 269 checkisexec = None
259 270 fh, fn = pycompat.mkstemp(dir=checkdir, prefix=b'hg-checkexec-')
260 271 try:
261 272 os.close(fh)
262 273 m = os.stat(fn).st_mode
263 274 if m & EXECFLAGS == 0:
264 275 os.chmod(fn, m & 0o777 | EXECFLAGS)
265 276 if os.stat(fn).st_mode & EXECFLAGS != 0:
266 277 if checkisexec is not None:
267 278 os.rename(fn, checkisexec)
268 279 fn = None
269 280 return True
270 281 finally:
271 282 if fn is not None:
272 283 unlink(fn)
273 284 except (IOError, OSError):
274 285 # we don't care, the user probably won't be able to commit anyway
275 286 return False
276 287
277 288
278 def checklink(path):
289 def checklink(path: bytes) -> bool:
279 290 """check whether the given path is on a symlink-capable filesystem"""
280 291 # mktemp is not racy because symlink creation will fail if the
281 292 # file already exists
282 293 while True:
283 294 cachedir = os.path.join(path, b'.hg', b'wcache')
284 295 checklink = os.path.join(cachedir, b'checklink')
285 296 # try fast path, read only
286 297 if os.path.islink(checklink):
287 298 return True
288 299 if os.path.isdir(cachedir):
289 300 checkdir = cachedir
290 301 else:
291 302 checkdir = path
292 303 cachedir = None
293 304 name = tempfile.mktemp(
294 305 dir=pycompat.fsdecode(checkdir), prefix=r'checklink-'
295 306 )
296 307 name = pycompat.fsencode(name)
297 308 try:
298 309 fd = None
299 310 if cachedir is None:
300 311 fd = pycompat.namedtempfile(
301 312 dir=checkdir, prefix=b'hg-checklink-'
302 313 )
303 314 target = os.path.basename(fd.name)
304 315 else:
305 316 # create a fixed file to link to; doesn't matter if it
306 317 # already exists.
307 318 target = b'checklink-target'
308 319 try:
309 320 fullpath = os.path.join(cachedir, target)
310 321 open(fullpath, b'w').close()
311 322 except PermissionError:
312 323 # If we can't write to cachedir, just pretend
313 324 # that the fs is readonly and by association
314 325 # that the fs won't support symlinks. This
315 326 # seems like the least dangerous way to avoid
316 327 # data loss.
317 328 return False
318 329 try:
319 330 os.symlink(target, name)
320 331 if cachedir is None:
321 332 unlink(name)
322 333 else:
323 334 try:
324 335 os.rename(name, checklink)
325 336 except OSError:
326 337 unlink(name)
327 338 return True
328 339 except FileExistsError:
329 340 # link creation might race, try again
330 341 continue
331 342 finally:
332 343 if fd is not None:
333 344 fd.close()
334 345 except AttributeError:
335 346 return False
336 347 except OSError as inst:
337 348 # sshfs might report failure while successfully creating the link
338 349 if inst.errno == errno.EIO and os.path.exists(name):
339 350 unlink(name)
340 351 return False
341 352
342 353
343 354 def checkosfilename(path):
344 355 """Check that the base-relative path is a valid filename on this platform.
345 356 Returns None if the path is ok, or a UI string describing the problem."""
346 357 return None # on posix platforms, every path is ok
347 358
348 359
349 360 def getfsmountpoint(dirpath):
350 361 """Get the filesystem mount point from a directory (best-effort)
351 362
352 363 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
353 364 """
354 365 return getattr(osutil, 'getfsmountpoint', lambda x: None)(dirpath)
355 366
356 367
357 368 def getfstype(dirpath: bytes) -> Optional[bytes]:
358 369 """Get the filesystem type name from a directory (best-effort)
359 370
360 371 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
361 372 """
362 373 return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
363 374
364 375
365 def get_password():
376 def get_password() -> bytes:
366 377 return encoding.strtolocal(getpass.getpass(''))
367 378
368 379
369 def setbinary(fd):
380 def setbinary(fd) -> None:
370 381 pass
371 382
372 383
373 def pconvert(path):
384 def pconvert(path: bytes) -> bytes:
374 385 return path
375 386
376 387
377 def localpath(path):
388 def localpath(path: bytes) -> bytes:
378 389 return path
379 390
380 391
381 392 def samefile(fpath1: bytes, fpath2: bytes) -> bool:
382 393 """Returns whether path1 and path2 refer to the same file. This is only
383 394 guaranteed to work for files, not directories."""
384 395 return os.path.samefile(fpath1, fpath2)
385 396
386 397
387 398 def samedevice(fpath1: bytes, fpath2: bytes) -> bool:
388 399 """Returns whether fpath1 and fpath2 are on the same device. This is only
389 400 guaranteed to work for files, not directories."""
390 401 st1 = os.lstat(fpath1)
391 402 st2 = os.lstat(fpath2)
392 403 return st1.st_dev == st2.st_dev
393 404
394 405
395 406 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
396 def normcase(path):
407 def normcase(path: bytes) -> bytes:
397 408 return path.lower()
398 409
399 410
400 411 # what normcase does to ASCII strings
401 412 normcasespec = encoding.normcasespecs.lower
402 413 # fallback normcase function for non-ASCII strings
403 414 normcasefallback = normcase
404 415
405 416 if pycompat.isdarwin:
406 417
407 def normcase(path):
418 def normcase(path: bytes) -> bytes:
408 419 """
409 420 Normalize a filename for OS X-compatible comparison:
410 421 - escape-encode invalid characters
411 422 - decompose to NFD
412 423 - lowercase
413 424 - omit ignored characters [200c-200f, 202a-202e, 206a-206f,feff]
414 425
415 426 >>> normcase(b'UPPER')
416 427 'upper'
417 428 >>> normcase(b'Caf\\xc3\\xa9')
418 429 'cafe\\xcc\\x81'
419 430 >>> normcase(b'\\xc3\\x89')
420 431 'e\\xcc\\x81'
421 432 >>> normcase(b'\\xb8\\xca\\xc3\\xca\\xbe\\xc8.JPG') # issue3918
422 433 '%b8%ca%c3\\xca\\xbe%c8.jpg'
423 434 """
424 435
425 436 try:
426 437 return encoding.asciilower(path) # exception for non-ASCII
427 438 except UnicodeDecodeError:
428 439 return normcasefallback(path)
429 440
430 441 normcasespec = encoding.normcasespecs.lower
431 442
432 def normcasefallback(path):
443 def normcasefallback(path: bytes) -> bytes:
433 444 try:
434 445 u = path.decode('utf-8')
435 446 except UnicodeDecodeError:
436 447 # OS X percent-encodes any bytes that aren't valid utf-8
437 448 s = b''
438 449 pos = 0
439 450 l = len(path)
440 451 while pos < l:
441 452 try:
442 453 c = encoding.getutf8char(path, pos)
443 454 pos += len(c)
444 455 except ValueError:
445 456 c = b'%%%02X' % ord(path[pos : pos + 1])
446 457 pos += 1
447 458 s += c
448 459
449 460 u = s.decode('utf-8')
450 461
451 462 # Decompose then lowercase (HFS+ technote specifies lower)
452 463 enc = unicodedata.normalize('NFD', u).lower().encode('utf-8')
453 464 # drop HFS+ ignored characters
454 465 return encoding.hfsignoreclean(enc)
455 466
456 467
457 468 if pycompat.sysplatform == b'cygwin':
458 469 # workaround for cygwin, in which mount point part of path is
459 470 # treated as case sensitive, even though underlying NTFS is case
460 471 # insensitive.
461 472
462 473 # default mount points
463 474 cygwinmountpoints = sorted(
464 475 [
465 476 b"/usr/bin",
466 477 b"/usr/lib",
467 478 b"/cygdrive",
468 479 ],
469 480 reverse=True,
470 481 )
471 482
472 483 # use upper-ing as normcase as same as NTFS workaround
473 def normcase(path):
484 def normcase(path: bytes) -> bytes:
474 485 pathlen = len(path)
475 486 if (pathlen == 0) or (path[0] != pycompat.ossep):
476 487 # treat as relative
477 488 return encoding.upper(path)
478 489
479 490 # to preserve case of mountpoint part
480 491 for mp in cygwinmountpoints:
481 492 if not path.startswith(mp):
482 493 continue
483 494
484 495 mplen = len(mp)
485 496 if mplen == pathlen: # mount point itself
486 497 return mp
487 498 if path[mplen] == pycompat.ossep:
488 499 return mp + encoding.upper(path[mplen:])
489 500
490 501 return encoding.upper(path)
491 502
492 503 normcasespec = encoding.normcasespecs.other
493 504 normcasefallback = normcase
494 505
495 506 # Cygwin translates native ACLs to POSIX permissions,
496 507 # but these translations are not supported by native
497 508 # tools, so the exec bit tends to be set erroneously.
498 509 # Therefore, disable executable bit access on Cygwin.
499 def checkexec(path):
510 def checkexec(path: bytes) -> bool:
500 511 return False
501 512
502 513 # Similarly, Cygwin's symlink emulation is likely to create
503 514 # problems when Mercurial is used from both Cygwin and native
504 515 # Windows, with other native tools, or on shared volumes
505 def checklink(path):
516 def checklink(path: bytes) -> bool:
506 517 return False
507 518
508 519
509 520 _needsshellquote = None
510 521
511 522
512 def shellquote(s):
523 def shellquote(s: bytes) -> bytes:
513 524 if pycompat.sysplatform == b'OpenVMS':
514 525 return b'"%s"' % s
515 526 global _needsshellquote
516 527 if _needsshellquote is None:
517 528 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search
518 529 if s and not _needsshellquote(s):
519 530 # "s" shouldn't have to be quoted
520 531 return s
521 532 else:
522 533 return b"'%s'" % s.replace(b"'", b"'\\''")
523 534
524 535
525 def shellsplit(s):
536 def shellsplit(s: bytes) -> List[bytes]:
526 537 """Parse a command string in POSIX shell way (best-effort)"""
527 538 return pycompat.shlexsplit(s, posix=True)
528 539
529 540
530 541 def testpid(pid: int) -> bool:
531 542 '''return False if pid dead, True if running or not sure'''
532 543 if pycompat.sysplatform == b'OpenVMS':
533 544 return True
534 545 try:
535 546 os.kill(pid, 0)
536 547 return True
537 548 except OSError as inst:
538 549 return inst.errno != errno.ESRCH
539 550
540 551
541 def isowner(st):
552 def isowner(st: os.stat_result) -> bool:
542 553 """Return True if the stat object st is from the current user."""
543 554 return st.st_uid == os.getuid()
544 555
545 556
546 def findexe(command):
557 def findexe(command: bytes) -> Optional[bytes]:
547 558 """Find executable for command searching like which does.
548 559 If command is a basename then PATH is searched for command.
549 560 PATH isn't searched if command is an absolute or relative path.
550 561 If command isn't found None is returned."""
551 562 if pycompat.sysplatform == b'OpenVMS':
552 563 return command
553 564
554 def findexisting(executable):
565 def findexisting(executable: bytes) -> Optional[bytes]:
555 566 b'Will return executable if existing file'
556 567 if os.path.isfile(executable) and os.access(executable, os.X_OK):
557 568 return executable
558 569 return None
559 570
560 571 if pycompat.ossep in command:
561 572 return findexisting(command)
562 573
563 574 if pycompat.sysplatform == b'plan9':
564 575 return findexisting(os.path.join(b'/bin', command))
565 576
566 577 for path in encoding.environ.get(b'PATH', b'').split(pycompat.ospathsep):
567 578 executable = findexisting(os.path.join(path, command))
568 579 if executable is not None:
569 580 return executable
570 581 return None
571 582
572 583
573 584 def setsignalhandler() -> None:
574 585 pass
575 586
576 587
577 588 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
578 589
579 590
580 def statfiles(files):
591 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
581 592 """Stat each file in files. Yield each stat, or None if a file does not
582 593 exist or has a type we don't care about."""
583 594 lstat = os.lstat
584 595 getkind = stat.S_IFMT
585 596 for nf in files:
586 597 try:
587 598 st = lstat(nf)
588 599 if getkind(st.st_mode) not in _wantedkinds:
589 600 st = None
590 601 except (FileNotFoundError, NotADirectoryError):
591 602 st = None
592 603 yield st
593 604
594 605
595 606 def getuser() -> bytes:
596 607 '''return name of current user'''
597 608 return pycompat.fsencode(getpass.getuser())
598 609
599 610
600 def username(uid=None):
611 def username(uid: Optional[int] = None) -> Optional[bytes]:
601 612 """Return the name of the user with the given uid.
602 613
603 614 If uid is None, return the name of the current user."""
604 615
605 616 if uid is None:
606 617 uid = os.getuid()
607 618 try:
608 619 return pycompat.fsencode(pwd.getpwuid(uid)[0])
609 620 except KeyError:
610 621 return b'%d' % uid
611 622
612 623
613 def groupname(gid=None):
624 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
614 625 """Return the name of the group with the given gid.
615 626
616 627 If gid is None, return the name of the current group."""
617 628
618 629 if gid is None:
619 630 gid = os.getgid()
620 631 try:
621 632 return pycompat.fsencode(grp.getgrgid(gid)[0])
622 633 except KeyError:
623 634 return pycompat.bytestr(gid)
624 635
625 636
626 def groupmembers(name):
637 def groupmembers(name: bytes) -> List[bytes]:
627 638 """Return the list of members of the group with the given
628 639 name, KeyError if the group does not exist.
629 640 """
630 641 name = pycompat.fsdecode(name)
631 642 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem))
632 643
633 644
634 645 def spawndetached(args: List[bytes]) -> int:
635 646 return os.spawnvp(os.P_NOWAIT | getattr(os, 'P_DETACH', 0), args[0], args)
636 647
637 648
638 649 def gethgcmd():
639 650 return sys.argv[:1]
640 651
641 652
642 653 def makedir(path: bytes, notindexed: bool) -> None:
643 654 os.mkdir(path)
644 655
645 656
646 def lookupreg(key, name=None, scope=None):
657 def lookupreg(
658 key: bytes,
659 name: Optional[bytes] = None,
660 scope: Optional[Union[int, Iterable[int]]] = None,
661 ) -> Optional[bytes]:
647 662 return None
648 663
649 664
650 665 def hidewindow() -> None:
651 666 """Hide current shell window.
652 667
653 668 Used to hide the window opened when starting asynchronous
654 669 child process under Windows, unneeded on other systems.
655 670 """
656 671 pass
657 672
658 673
659 674 class cachestat:
660 675 def __init__(self, path):
661 676 self.stat = os.stat(path)
662 677
663 678 def cacheable(self):
664 679 return bool(self.stat.st_ino)
665 680
666 681 __hash__ = object.__hash__
667 682
668 683 def __eq__(self, other):
669 684 try:
670 685 # Only dev, ino, size, mtime and atime are likely to change. Out
671 686 # of these, we shouldn't compare atime but should compare the
672 687 # rest. However, one of the other fields changing indicates
673 688 # something fishy going on, so return False if anything but atime
674 689 # changes.
675 690 return (
676 691 self.stat.st_mode == other.stat.st_mode
677 692 and self.stat.st_ino == other.stat.st_ino
678 693 and self.stat.st_dev == other.stat.st_dev
679 694 and self.stat.st_nlink == other.stat.st_nlink
680 695 and self.stat.st_uid == other.stat.st_uid
681 696 and self.stat.st_gid == other.stat.st_gid
682 697 and self.stat.st_size == other.stat.st_size
683 698 and self.stat[stat.ST_MTIME] == other.stat[stat.ST_MTIME]
684 699 and self.stat[stat.ST_CTIME] == other.stat[stat.ST_CTIME]
685 700 )
686 701 except AttributeError:
687 702 return False
688 703
689 704 def __ne__(self, other):
690 705 return not self == other
691 706
692 707
693 def statislink(st):
708 def statislink(st: Optional[os.stat_result]) -> bool:
694 709 '''check whether a stat result is a symlink'''
695 return st and stat.S_ISLNK(st.st_mode)
710 return stat.S_ISLNK(st.st_mode) if st else False
696 711
697 712
698 def statisexec(st):
713 def statisexec(st: Optional[os.stat_result]) -> bool:
699 714 '''check whether a stat result is an executable file'''
700 return st and (st.st_mode & 0o100 != 0)
715 return (st.st_mode & 0o100 != 0) if st else False
701 716
702 717
703 718 def poll(fds):
704 719 """block until something happens on any file descriptor
705 720
706 721 This is a generic helper that will check for any activity
707 722 (read, write. exception) and return the list of touched files.
708 723
709 724 In unsupported cases, it will raise a NotImplementedError"""
710 725 try:
711 726 res = select.select(fds, fds, fds)
712 727 except ValueError: # out of range file descriptor
713 728 raise NotImplementedError()
714 729 return sorted(list(set(sum(res, []))))
715 730
716 731
717 def readpipe(pipe):
732 def readpipe(pipe) -> bytes:
718 733 """Read all available data from a pipe."""
719 734 # We can't fstat() a pipe because Linux will always report 0.
720 735 # So, we set the pipe to non-blocking mode and read everything
721 736 # that's available.
722 737 flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
723 738 flags |= os.O_NONBLOCK
724 739 oldflags = fcntl.fcntl(pipe, fcntl.F_SETFL, flags)
725 740
726 741 try:
727 742 chunks = []
728 743 while True:
729 744 try:
730 745 s = pipe.read()
731 746 if not s:
732 747 break
733 748 chunks.append(s)
734 749 except IOError:
735 750 break
736 751
737 752 return b''.join(chunks)
738 753 finally:
739 754 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
740 755
741 756
742 def bindunixsocket(sock, path):
757 def bindunixsocket(sock, path: bytes) -> None:
743 758 """Bind the UNIX domain socket to the specified path"""
744 759 # use relative path instead of full path at bind() if possible, since
745 760 # AF_UNIX path has very small length limit (107 chars) on common
746 761 # platforms (see sys/un.h)
747 762 dirname, basename = os.path.split(path)
748 763 bakwdfd = None
749 764
750 765 try:
751 766 if dirname:
752 767 bakwdfd = os.open(b'.', os.O_DIRECTORY)
753 768 os.chdir(dirname)
754 769 sock.bind(basename)
755 770 if bakwdfd:
756 771 os.fchdir(bakwdfd)
757 772 finally:
758 773 if bakwdfd:
759 774 os.close(bakwdfd)
@@ -1,728 +1,746 b''
1 1 # windows.py - Windows utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import getpass
11 11 import msvcrt # pytype: disable=import-error
12 12 import os
13 13 import re
14 14 import stat
15 15 import string
16 16 import sys
17 17 import winreg # pytype: disable=import-error
18 18
19 19 from typing import (
20 20 BinaryIO,
21 Iterable,
22 Iterator,
23 List,
24 NoReturn,
25 Optional,
26 Sequence,
27 Union,
21 28 )
22 29
23 30 from .i18n import _
24 31 from .pycompat import getattr
25 32 from . import (
26 33 encoding,
27 34 error,
28 35 policy,
29 36 pycompat,
30 37 typelib,
31 38 win32,
32 39 )
33 40
34 41
35 42 osutil = policy.importmod('osutil')
36 43
37 44 getfsmountpoint = win32.getvolumename
38 45 getfstype = win32.getfstype
39 46 getuser = win32.getuser
40 47 hidewindow = win32.hidewindow
41 48 makedir = win32.makedir
42 49 nlinks = win32.nlinks
43 50 oslink = win32.oslink
44 51 samedevice = win32.samedevice
45 52 samefile = win32.samefile
46 53 setsignalhandler = win32.setsignalhandler
47 54 spawndetached = win32.spawndetached
48 55 split = os.path.split
49 56 testpid = win32.testpid
50 57 unlink = win32.unlink
51 58
52 59 umask = 0o022
53 60
54 61
55 62 class mixedfilemodewrapper:
56 63 """Wraps a file handle when it is opened in read/write mode.
57 64
58 65 fopen() and fdopen() on Windows have a specific-to-Windows requirement
59 66 that files opened with mode r+, w+, or a+ make a call to a file positioning
60 67 function when switching between reads and writes. Without this extra call,
61 68 Python will raise a not very intuitive "IOError: [Errno 0] Error."
62 69
63 70 This class wraps posixfile instances when the file is opened in read/write
64 71 mode and automatically adds checks or inserts appropriate file positioning
65 72 calls when necessary.
66 73 """
67 74
68 75 OPNONE = 0
69 76 OPREAD = 1
70 77 OPWRITE = 2
71 78
72 79 def __init__(self, fp):
73 80 object.__setattr__(self, '_fp', fp)
74 81 object.__setattr__(self, '_lastop', 0)
75 82
76 83 def __enter__(self):
77 84 self._fp.__enter__()
78 85 return self
79 86
80 87 def __exit__(self, exc_type, exc_val, exc_tb):
81 88 self._fp.__exit__(exc_type, exc_val, exc_tb)
82 89
83 90 def __getattr__(self, name):
84 91 return getattr(self._fp, name)
85 92
86 93 def __setattr__(self, name, value):
87 94 return self._fp.__setattr__(name, value)
88 95
89 96 def _noopseek(self):
90 97 self._fp.seek(0, os.SEEK_CUR)
91 98
92 99 def seek(self, *args, **kwargs):
93 100 object.__setattr__(self, '_lastop', self.OPNONE)
94 101 return self._fp.seek(*args, **kwargs)
95 102
96 103 def write(self, d):
97 104 if self._lastop == self.OPREAD:
98 105 self._noopseek()
99 106
100 107 object.__setattr__(self, '_lastop', self.OPWRITE)
101 108 return self._fp.write(d)
102 109
103 110 def writelines(self, *args, **kwargs):
104 111 if self._lastop == self.OPREAD:
105 112 self._noopeseek()
106 113
107 114 object.__setattr__(self, '_lastop', self.OPWRITE)
108 115 return self._fp.writelines(*args, **kwargs)
109 116
110 117 def read(self, *args, **kwargs):
111 118 if self._lastop == self.OPWRITE:
112 119 self._noopseek()
113 120
114 121 object.__setattr__(self, '_lastop', self.OPREAD)
115 122 return self._fp.read(*args, **kwargs)
116 123
117 124 def readline(self, *args, **kwargs):
118 125 if self._lastop == self.OPWRITE:
119 126 self._noopseek()
120 127
121 128 object.__setattr__(self, '_lastop', self.OPREAD)
122 129 return self._fp.readline(*args, **kwargs)
123 130
124 131 def readlines(self, *args, **kwargs):
125 132 if self._lastop == self.OPWRITE:
126 133 self._noopseek()
127 134
128 135 object.__setattr__(self, '_lastop', self.OPREAD)
129 136 return self._fp.readlines(*args, **kwargs)
130 137
131 138
132 139 class fdproxy:
133 140 """Wraps osutil.posixfile() to override the name attribute to reflect the
134 141 underlying file name.
135 142 """
136 143
137 144 def __init__(self, name, fp):
138 145 self.name = name
139 146 self._fp = fp
140 147
141 148 def __enter__(self):
142 149 self._fp.__enter__()
143 150 # Return this wrapper for the context manager so that the name is
144 151 # still available.
145 152 return self
146 153
147 154 def __exit__(self, exc_type, exc_value, traceback):
148 155 self._fp.__exit__(exc_type, exc_value, traceback)
149 156
150 157 def __iter__(self):
151 158 return iter(self._fp)
152 159
153 160 def __getattr__(self, name):
154 161 return getattr(self._fp, name)
155 162
156 163
157 164 def posixfile(name, mode=b'r', buffering=-1):
158 165 '''Open a file with even more POSIX-like semantics'''
159 166 try:
160 167 fp = osutil.posixfile(name, mode, buffering) # may raise WindowsError
161 168
162 169 # PyFile_FromFd() ignores the name, and seems to report fp.name as the
163 170 # underlying file descriptor.
164 171 fp = fdproxy(name, fp)
165 172
166 173 # The position when opening in append mode is implementation defined, so
167 174 # make it consistent with other platforms, which position at EOF.
168 175 if b'a' in mode:
169 176 fp.seek(0, os.SEEK_END)
170 177
171 178 if b'+' in mode:
172 179 return mixedfilemodewrapper(fp)
173 180
174 181 return fp
175 182 except WindowsError as err: # pytype: disable=name-error
176 183 # convert to a friendlier exception
177 184 raise IOError(
178 185 err.errno, '%s: %s' % (encoding.strfromlocal(name), err.strerror)
179 186 )
180 187
181 188
182 189 # may be wrapped by win32mbcs extension
183 190 listdir = osutil.listdir
184 191
185 192
186 def get_password():
193 def get_password() -> bytes:
187 194 """Prompt for password with echo off, using Windows getch().
188 195
189 196 This shouldn't be called directly- use ``ui.getpass()`` instead, which
190 197 checks if the session is interactive first.
191 198 """
192 199 pw = u""
193 200 while True:
194 201 c = msvcrt.getwch() # pytype: disable=module-attr
195 202 if c == u'\r' or c == u'\n':
196 203 break
197 204 if c == u'\003':
198 205 raise KeyboardInterrupt
199 206 if c == u'\b':
200 207 pw = pw[:-1]
201 208 else:
202 209 pw = pw + c
203 210 msvcrt.putwch(u'\r') # pytype: disable=module-attr
204 211 msvcrt.putwch(u'\n') # pytype: disable=module-attr
205 212 return encoding.unitolocal(pw)
206 213
207 214
208 215 class winstdout(typelib.BinaryIO_Proxy):
209 216 """Some files on Windows misbehave.
210 217
211 218 When writing to a broken pipe, EINVAL instead of EPIPE may be raised.
212 219
213 220 When writing too many bytes to a console at the same, a "Not enough space"
214 221 error may happen. Python 3 already works around that.
215 222 """
216 223
217 224 def __init__(self, fp: BinaryIO):
218 225 self.fp = fp
219 226
220 227 def __getattr__(self, key):
221 228 return getattr(self.fp, key)
222 229
223 230 def close(self):
224 231 try:
225 232 self.fp.close()
226 233 except IOError:
227 234 pass
228 235
229 236 def write(self, s):
230 237 try:
231 238 return self.fp.write(s)
232 239 except IOError as inst:
233 240 if inst.errno != 0 and not win32.lasterrorwaspipeerror(inst):
234 241 raise
235 242 self.close()
236 243 raise IOError(errno.EPIPE, 'Broken pipe')
237 244
238 245 def flush(self):
239 246 try:
240 247 return self.fp.flush()
241 248 except IOError as inst:
242 249 if not win32.lasterrorwaspipeerror(inst):
243 250 raise
244 251 raise IOError(errno.EPIPE, 'Broken pipe')
245 252
246 253
247 def openhardlinks():
254 def openhardlinks() -> bool:
248 255 return True
249 256
250 257
251 def parsepatchoutput(output_line):
258 def parsepatchoutput(output_line: bytes) -> bytes:
252 259 """parses the output produced by patch and returns the filename"""
253 260 pf = output_line[14:]
254 261 if pf[0] == b'`':
255 262 pf = pf[1:-1] # Remove the quotes
256 263 return pf
257 264
258 265
259 def sshargs(sshcmd, host, user, port):
266 def sshargs(
267 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
268 ) -> bytes:
260 269 '''Build argument list for ssh or Plink'''
261 270 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p'
262 271 args = user and (b"%s@%s" % (user, host)) or host
263 272 if args.startswith(b'-') or args.startswith(b'/'):
264 273 raise error.Abort(
265 274 _(b'illegal ssh hostname or username starting with - or /: %s')
266 275 % args
267 276 )
268 277 args = shellquote(args)
269 278 if port:
270 279 args = b'%s %s %s' % (pflag, shellquote(port), args)
271 280 return args
272 281
273 282
274 def setflags(f, l, x):
275 pass
276
277
278 def copymode(src, dst, mode=None, enforcewritable=False):
283 def setflags(f: bytes, l: bool, x: bool) -> None:
279 284 pass
280 285
281 286
282 def checkexec(path):
287 def copymode(
288 src: bytes,
289 dst: bytes,
290 mode: Optional[bytes] = None,
291 enforcewritable: bool = False,
292 ) -> None:
293 pass
294
295
296 def checkexec(path: bytes) -> bool:
283 297 return False
284 298
285 299
286 def checklink(path):
300 def checklink(path: bytes) -> bool:
287 301 return False
288 302
289 303
290 def setbinary(fd):
304 def setbinary(fd) -> None:
291 305 # When run without console, pipes may expose invalid
292 306 # fileno(), usually set to -1.
293 307 fno = getattr(fd, 'fileno', None)
294 308 if fno is not None and fno() >= 0:
295 309 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr
296 310
297 311
298 def pconvert(path):
312 def pconvert(path: bytes) -> bytes:
299 313 return path.replace(pycompat.ossep, b'/')
300 314
301 315
302 def localpath(path):
316 def localpath(path: bytes) -> bytes:
303 317 return path.replace(b'/', b'\\')
304 318
305 319
306 320 def normpath(path):
307 321 return pconvert(os.path.normpath(path))
308 322
309 323
310 def normcase(path):
324 def normcase(path: bytes) -> bytes:
311 325 return encoding.upper(path) # NTFS compares via upper()
312 326
313 327
314 328 DRIVE_RE_B = re.compile(b'^[a-z]:')
315 329 DRIVE_RE_S = re.compile('^[a-z]:')
316 330
317 331
318 332 def abspath(path):
319 333 abs_path = os.path.abspath(path) # re-exports
320 334 # Python on Windows is inconsistent regarding the capitalization of drive
321 335 # letter and this cause issue with various path comparison along the way.
322 336 # So we normalize the drive later to upper case here.
323 337 #
324 338 # See https://bugs.python.org/issue40368 for and example of this hell.
325 339 if isinstance(abs_path, bytes):
326 340 if DRIVE_RE_B.match(abs_path):
327 341 abs_path = abs_path[0:1].upper() + abs_path[1:]
328 342 elif DRIVE_RE_S.match(abs_path):
329 343 abs_path = abs_path[0:1].upper() + abs_path[1:]
330 344 return abs_path
331 345
332 346
333 347 # see posix.py for definitions
334 348 normcasespec = encoding.normcasespecs.upper
335 349 normcasefallback = encoding.upperfallback
336 350
337 351
338 352 def samestat(s1, s2):
339 353 return False
340 354
341 355
342 356 def shelltocmdexe(path, env):
343 357 r"""Convert shell variables in the form $var and ${var} inside ``path``
344 358 to %var% form. Existing Windows style variables are left unchanged.
345 359
346 360 The variables are limited to the given environment. Unknown variables are
347 361 left unchanged.
348 362
349 363 >>> e = {b'var1': b'v1', b'var2': b'v2', b'var3': b'v3'}
350 364 >>> # Only valid values are expanded
351 365 >>> shelltocmdexe(b'cmd $var1 ${var2} %var3% $missing ${missing} %missing%',
352 366 ... e)
353 367 'cmd %var1% %var2% %var3% $missing ${missing} %missing%'
354 368 >>> # Single quote prevents expansion, as does \$ escaping
355 369 >>> shelltocmdexe(b"cmd '$var1 ${var2} %var3%' \$var1 \${var2} \\", e)
356 370 'cmd "$var1 ${var2} %var3%" $var1 ${var2} \\'
357 371 >>> # $$ is not special. %% is not special either, but can be the end and
358 372 >>> # start of consecutive variables
359 373 >>> shelltocmdexe(b"cmd $$ %% %var1%%var2%", e)
360 374 'cmd $$ %% %var1%%var2%'
361 375 >>> # No double substitution
362 376 >>> shelltocmdexe(b"$var1 %var1%", {b'var1': b'%var2%', b'var2': b'boom'})
363 377 '%var1% %var1%'
364 378 >>> # Tilde expansion
365 379 >>> shelltocmdexe(b"~/dir ~\dir2 ~tmpfile \~/", {})
366 380 '%USERPROFILE%/dir %USERPROFILE%\\dir2 ~tmpfile ~/'
367 381 """
368 382 if not any(c in path for c in b"$'~"):
369 383 return path
370 384
371 385 varchars = pycompat.sysbytes(string.ascii_letters + string.digits) + b'_-'
372 386
373 387 res = b''
374 388 index = 0
375 389 pathlen = len(path)
376 390 while index < pathlen:
377 391 c = path[index : index + 1]
378 392 if c == b'\'': # no expansion within single quotes
379 393 path = path[index + 1 :]
380 394 pathlen = len(path)
381 395 try:
382 396 index = path.index(b'\'')
383 397 res += b'"' + path[:index] + b'"'
384 398 except ValueError:
385 399 res += c + path
386 400 index = pathlen - 1
387 401 elif c == b'%': # variable
388 402 path = path[index + 1 :]
389 403 pathlen = len(path)
390 404 try:
391 405 index = path.index(b'%')
392 406 except ValueError:
393 407 res += b'%' + path
394 408 index = pathlen - 1
395 409 else:
396 410 var = path[:index]
397 411 res += b'%' + var + b'%'
398 412 elif c == b'$': # variable
399 413 if path[index + 1 : index + 2] == b'{':
400 414 path = path[index + 2 :]
401 415 pathlen = len(path)
402 416 try:
403 417 index = path.index(b'}')
404 418 var = path[:index]
405 419
406 420 # See below for why empty variables are handled specially
407 421 if env.get(var, b'') != b'':
408 422 res += b'%' + var + b'%'
409 423 else:
410 424 res += b'${' + var + b'}'
411 425 except ValueError:
412 426 res += b'${' + path
413 427 index = pathlen - 1
414 428 else:
415 429 var = b''
416 430 index += 1
417 431 c = path[index : index + 1]
418 432 while c != b'' and c in varchars:
419 433 var += c
420 434 index += 1
421 435 c = path[index : index + 1]
422 436 # Some variables (like HG_OLDNODE) may be defined, but have an
423 437 # empty value. Those need to be skipped because when spawning
424 438 # cmd.exe to run the hook, it doesn't replace %VAR% for an empty
425 439 # VAR, and that really confuses things like revset expressions.
426 440 # OTOH, if it's left in Unix format and the hook runs sh.exe, it
427 441 # will substitute to an empty string, and everything is happy.
428 442 if env.get(var, b'') != b'':
429 443 res += b'%' + var + b'%'
430 444 else:
431 445 res += b'$' + var
432 446
433 447 if c != b'':
434 448 index -= 1
435 449 elif (
436 450 c == b'~'
437 451 and index + 1 < pathlen
438 452 and path[index + 1 : index + 2] in (b'\\', b'/')
439 453 ):
440 454 res += b"%USERPROFILE%"
441 455 elif (
442 456 c == b'\\'
443 457 and index + 1 < pathlen
444 458 and path[index + 1 : index + 2] in (b'$', b'~')
445 459 ):
446 460 # Skip '\', but only if it is escaping $ or ~
447 461 res += path[index + 1 : index + 2]
448 462 index += 1
449 463 else:
450 464 res += c
451 465
452 466 index += 1
453 467 return res
454 468
455 469
456 470 # A sequence of backslashes is special iff it precedes a double quote:
457 471 # - if there's an even number of backslashes, the double quote is not
458 472 # quoted (i.e. it ends the quoted region)
459 473 # - if there's an odd number of backslashes, the double quote is quoted
460 474 # - in both cases, every pair of backslashes is unquoted into a single
461 475 # backslash
462 476 # (See http://msdn2.microsoft.com/en-us/library/a1y7w461.aspx )
463 477 # So, to quote a string, we must surround it in double quotes, double
464 478 # the number of backslashes that precede double quotes and add another
465 479 # backslash before every double quote (being careful with the double
466 480 # quote we've appended to the end)
467 481 _quotere = None
468 482 _needsshellquote = None
469 483
470 484
471 def shellquote(s):
485 def shellquote(s: bytes) -> bytes:
472 486 r"""
473 487 >>> shellquote(br'C:\Users\xyz')
474 488 '"C:\\Users\\xyz"'
475 489 >>> shellquote(br'C:\Users\xyz/mixed')
476 490 '"C:\\Users\\xyz/mixed"'
477 491 >>> # Would be safe not to quote too, since it is all double backslashes
478 492 >>> shellquote(br'C:\\Users\\xyz')
479 493 '"C:\\\\Users\\\\xyz"'
480 494 >>> # But this must be quoted
481 495 >>> shellquote(br'C:\\Users\\xyz/abc')
482 496 '"C:\\\\Users\\\\xyz/abc"'
483 497 """
484 498 global _quotere
485 499 if _quotere is None:
486 500 _quotere = re.compile(br'(\\*)("|\\$)')
487 501 global _needsshellquote
488 502 if _needsshellquote is None:
489 503 # ":" is also treated as "safe character", because it is used as a part
490 504 # of path name on Windows. "\" is also part of a path name, but isn't
491 505 # safe because shlex.split() (kind of) treats it as an escape char and
492 506 # drops it. It will leave the next character, even if it is another
493 507 # "\".
494 508 _needsshellquote = re.compile(br'[^a-zA-Z0-9._:/-]').search
495 509 if s and not _needsshellquote(s) and not _quotere.search(s):
496 510 # "s" shouldn't have to be quoted
497 511 return s
498 512 return b'"%s"' % _quotere.sub(br'\1\1\\\2', s)
499 513
500 514
501 515 def _unquote(s):
502 516 if s.startswith(b'"') and s.endswith(b'"'):
503 517 return s[1:-1]
504 518 return s
505 519
506 520
507 def shellsplit(s):
521 def shellsplit(s: bytes) -> List[bytes]:
508 522 """Parse a command string in cmd.exe way (best-effort)"""
509 523 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False))
510 524
511 525
512 526 # if you change this stub into a real check, please try to implement the
513 527 # username and groupname functions above, too.
514 def isowner(st):
528 def isowner(st: os.stat_result) -> bool:
515 529 return True
516 530
517 531
518 def findexe(command):
532 def findexe(command: bytes) -> Optional[bytes]:
519 533 """Find executable for command searching like cmd.exe does.
520 534 If command is a basename then PATH is searched for command.
521 535 PATH isn't searched if command is an absolute or relative path.
522 536 An extension from PATHEXT is found and added if not present.
523 537 If command isn't found None is returned."""
524 538 pathext = encoding.environ.get(b'PATHEXT', b'.COM;.EXE;.BAT;.CMD')
525 539 pathexts = [ext for ext in pathext.lower().split(pycompat.ospathsep)]
526 540 if os.path.splitext(command)[1].lower() in pathexts:
527 541 pathexts = [b'']
528 542
529 def findexisting(pathcommand):
543 def findexisting(pathcommand: bytes) -> Optional[bytes]:
530 544 """Will append extension (if needed) and return existing file"""
531 545 for ext in pathexts:
532 546 executable = pathcommand + ext
533 547 if os.path.exists(executable):
534 548 return executable
535 549 return None
536 550
537 551 if pycompat.ossep in command:
538 552 return findexisting(command)
539 553
540 554 for path in encoding.environ.get(b'PATH', b'').split(pycompat.ospathsep):
541 555 executable = findexisting(os.path.join(path, command))
542 556 if executable is not None:
543 557 return executable
544 558 return findexisting(os.path.expanduser(os.path.expandvars(command)))
545 559
546 560
547 561 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
548 562
549 563
550 def statfiles(files):
564 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
551 565 """Stat each file in files. Yield each stat, or None if a file
552 566 does not exist or has a type we don't care about.
553 567
554 568 Cluster and cache stat per directory to minimize number of OS stat calls."""
555 569 dircache = {} # dirname -> filename -> status | None if file does not exist
556 570 getkind = stat.S_IFMT
557 571 for nf in files:
558 572 nf = normcase(nf)
559 573 dir, base = os.path.split(nf)
560 574 if not dir:
561 575 dir = b'.'
562 576 cache = dircache.get(dir, None)
563 577 if cache is None:
564 578 try:
565 579 dmap = {
566 580 normcase(n): s
567 581 for n, k, s in listdir(dir, True)
568 582 if getkind(s.st_mode) in _wantedkinds
569 583 }
570 584 except (FileNotFoundError, NotADirectoryError):
571 585 dmap = {}
572 586 cache = dircache.setdefault(dir, dmap)
573 587 yield cache.get(base, None)
574 588
575 589
576 def username(uid=None):
590 def username(uid: Optional[int] = None) -> Optional[bytes]:
577 591 """Return the name of the user with the given uid.
578 592
579 593 If uid is None, return the name of the current user."""
580 594 if not uid:
581 595 try:
582 596 return pycompat.fsencode(getpass.getuser())
583 597 except ModuleNotFoundError:
584 598 # getpass.getuser() checks for a few environment variables first,
585 599 # but if those aren't set, imports pwd and calls getpwuid(), none of
586 600 # which exists on Windows.
587 601 pass
588 602 return None
589 603
590 604
591 def groupname(gid=None):
605 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
592 606 """Return the name of the group with the given gid.
593 607
594 608 If gid is None, return the name of the current group."""
595 609 return None
596 610
597 611
598 612 def readlink(pathname):
599 613 path = pycompat.fsdecode(pathname)
600 614 try:
601 615 link = os.readlink(path)
602 616 except ValueError as e:
603 617 # On py2, os.readlink() raises an AttributeError since it is
604 618 # unsupported. On py3, reading a non-link raises a ValueError. Simply
605 619 # treat this as the error the locking code has been expecting up to now
606 620 # until an effort can be made to enable symlink support on Windows.
607 621 raise AttributeError(e)
608 622 return pycompat.fsencode(link)
609 623
610 624
611 625 def removedirs(name):
612 626 """special version of os.removedirs that does not remove symlinked
613 627 directories or junction points if they actually contain files"""
614 628 if listdir(name):
615 629 return
616 630 os.rmdir(name)
617 631 head, tail = os.path.split(name)
618 632 if not tail:
619 633 head, tail = os.path.split(head)
620 634 while head and tail:
621 635 try:
622 636 if listdir(head):
623 637 return
624 638 os.rmdir(head)
625 639 except (ValueError, OSError):
626 640 break
627 641 head, tail = os.path.split(head)
628 642
629 643
630 644 def rename(src, dst):
631 645 '''atomically rename file src to dst, replacing dst if it exists'''
632 646 try:
633 647 os.rename(src, dst)
634 648 except FileExistsError:
635 649 unlink(dst)
636 650 os.rename(src, dst)
637 651
638 652
639 653 def gethgcmd():
640 654 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]]
641 655
642 656
643 def groupmembers(name):
657 def groupmembers(name: bytes) -> List[bytes]:
644 658 # Don't support groups on Windows for now
645 659 raise KeyError
646 660
647 661
648 def isexec(f):
662 def isexec(f: bytes) -> bool:
649 663 return False
650 664
651 665
652 666 class cachestat:
653 667 def __init__(self, path):
654 668 pass
655 669
656 670 def cacheable(self):
657 671 return False
658 672
659 673
660 def lookupreg(key, valname=None, scope=None):
674 def lookupreg(
675 key: bytes,
676 valname: Optional[bytes] = None,
677 scope: Optional[Union[int, Iterable[int]]] = None,
678 ) -> Optional[bytes]:
661 679 """Look up a key/value name in the Windows registry.
662 680
663 681 valname: value name. If unspecified, the default value for the key
664 682 is used.
665 683 scope: optionally specify scope for registry lookup, this can be
666 684 a sequence of scopes to look up in order. Default (CURRENT_USER,
667 685 LOCAL_MACHINE).
668 686 """
669 687 if scope is None:
670 688 # pytype: disable=module-attr
671 689 scope = (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE)
672 690 # pytype: enable=module-attr
673 691 elif not isinstance(scope, (list, tuple)):
674 692 scope = (scope,)
675 693 for s in scope:
676 694 try:
677 695 # pytype: disable=module-attr
678 696 with winreg.OpenKey(s, encoding.strfromlocal(key)) as hkey:
679 697 # pytype: enable=module-attr
680 698 name = None
681 699 if valname is not None:
682 700 name = encoding.strfromlocal(valname)
683 701 # pytype: disable=module-attr
684 702 val = winreg.QueryValueEx(hkey, name)[0]
685 703 # pytype: enable=module-attr
686 704
687 705 # never let a Unicode string escape into the wild
688 706 return encoding.unitolocal(val)
689 707 except EnvironmentError:
690 708 pass
691 709
692 710
693 711 expandglobs = True
694 712
695 713
696 def statislink(st):
714 def statislink(st: Optional[os.stat_result]) -> bool:
697 715 '''check whether a stat result is a symlink'''
698 716 return False
699 717
700 718
701 def statisexec(st):
719 def statisexec(st: Optional[os.stat_result]) -> bool:
702 720 '''check whether a stat result is an executable file'''
703 721 return False
704 722
705 723
706 724 def poll(fds):
707 725 # see posix.py for description
708 726 raise NotImplementedError()
709 727
710 728
711 def readpipe(pipe):
729 def readpipe(pipe) -> bytes:
712 730 """Read all available data from a pipe."""
713 731 chunks = []
714 732 while True:
715 733 size = win32.peekpipe(pipe)
716 734 if not size:
717 735 break
718 736
719 737 s = pipe.read(size)
720 738 if not s:
721 739 break
722 740 chunks.append(s)
723 741
724 742 return b''.join(chunks)
725 743
726 744
727 def bindunixsocket(sock, path):
745 def bindunixsocket(sock, path: bytes) -> NoReturn:
728 746 raise NotImplementedError('unsupported platform')
General Comments 0
You need to be logged in to leave comments. Login now