##// END OF EJS Templates
typing: add type hints to the common posix/windows platform functions...
Matt Harbison -
r50707:58dff81f default
parent child Browse files
Show More
@@ -20,9 +20,13 b' import tempfile'
20 20 import unicodedata
21 21
22 22 from typing import (
23 Iterable,
24 Iterator,
23 25 List,
24 26 NoReturn,
25 27 Optional,
28 Sequence,
29 Union,
26 30 )
27 31
28 32 from .i18n import _
@@ -91,7 +95,7 b' def split(p):'
91 95 return ht[0] + b'/', ht[1]
92 96
93 97
94 def openhardlinks():
98 def openhardlinks() -> bool:
95 99 '''return true if it is safe to hold open file handles to hardlinks'''
96 100 return True
97 101
@@ -101,7 +105,7 b' def nlinks(name: bytes) -> int:'
101 105 return os.lstat(name).st_nlink
102 106
103 107
104 def parsepatchoutput(output_line):
108 def parsepatchoutput(output_line: bytes) -> bytes:
105 109 """parses the output produced by patch and returns the filename"""
106 110 pf = output_line[14:]
107 111 if pycompat.sysplatform == b'OpenVMS':
@@ -113,7 +117,9 b' def parsepatchoutput(output_line):'
113 117 return pf
114 118
115 119
116 def sshargs(sshcmd, host, user, port):
120 def sshargs(
121 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
122 ) -> bytes:
117 123 '''Build argument list for ssh'''
118 124 args = user and (b"%s@%s" % (user, host)) or host
119 125 if b'-' in args[:1]:
@@ -126,12 +132,12 b' def sshargs(sshcmd, host, user, port):'
126 132 return args
127 133
128 134
129 def isexec(f):
135 def isexec(f: bytes) -> bool:
130 136 """check whether a file is executable"""
131 137 return os.lstat(f).st_mode & 0o100 != 0
132 138
133 139
134 def setflags(f, l, x):
140 def setflags(f: bytes, l: bool, x: bool) -> None:
135 141 st = os.lstat(f)
136 142 s = st.st_mode
137 143 if l:
@@ -175,7 +181,12 b' def setflags(f, l, x):'
175 181 os.chmod(f, s & 0o666)
176 182
177 183
178 def copymode(src, dst, mode=None, enforcewritable=False):
184 def copymode(
185 src: bytes,
186 dst: bytes,
187 mode: Optional[bytes] = None,
188 enforcewritable: bool = False,
189 ) -> None:
179 190 """Copy the file mode from the file at path src to dst.
180 191 If src doesn't exist, we're using mode instead. If mode is None, we're
181 192 using umask."""
@@ -195,7 +206,7 b' def copymode(src, dst, mode=None, enforc'
195 206 os.chmod(dst, new_mode)
196 207
197 208
198 def checkexec(path):
209 def checkexec(path: bytes) -> bool:
199 210 """
200 211 Check whether the given path is on a filesystem with UNIX-like exec flags
201 212
@@ -275,7 +286,7 b' def checkexec(path):'
275 286 return False
276 287
277 288
278 def checklink(path):
289 def checklink(path: bytes) -> bool:
279 290 """check whether the given path is on a symlink-capable filesystem"""
280 291 # mktemp is not racy because symlink creation will fail if the
281 292 # file already exists
@@ -362,19 +373,19 b' def getfstype(dirpath: bytes) -> Optiona'
362 373 return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
363 374
364 375
365 def get_password():
376 def get_password() -> bytes:
366 377 return encoding.strtolocal(getpass.getpass(''))
367 378
368 379
369 def setbinary(fd):
380 def setbinary(fd) -> None:
370 381 pass
371 382
372 383
373 def pconvert(path):
384 def pconvert(path: bytes) -> bytes:
374 385 return path
375 386
376 387
377 def localpath(path):
388 def localpath(path: bytes) -> bytes:
378 389 return path
379 390
380 391
@@ -393,7 +404,7 b' def samedevice(fpath1: bytes, fpath2: by'
393 404
394 405
395 406 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
396 def normcase(path):
407 def normcase(path: bytes) -> bytes:
397 408 return path.lower()
398 409
399 410
@@ -404,7 +415,7 b' normcasefallback = normcase'
404 415
405 416 if pycompat.isdarwin:
406 417
407 def normcase(path):
418 def normcase(path: bytes) -> bytes:
408 419 """
409 420 Normalize a filename for OS X-compatible comparison:
410 421 - escape-encode invalid characters
@@ -429,7 +440,7 b' if pycompat.isdarwin:'
429 440
430 441 normcasespec = encoding.normcasespecs.lower
431 442
432 def normcasefallback(path):
443 def normcasefallback(path: bytes) -> bytes:
433 444 try:
434 445 u = path.decode('utf-8')
435 446 except UnicodeDecodeError:
@@ -470,7 +481,7 b" if pycompat.sysplatform == b'cygwin':"
470 481 )
471 482
472 483 # use upper-ing as normcase as same as NTFS workaround
473 def normcase(path):
484 def normcase(path: bytes) -> bytes:
474 485 pathlen = len(path)
475 486 if (pathlen == 0) or (path[0] != pycompat.ossep):
476 487 # treat as relative
@@ -496,20 +507,20 b" if pycompat.sysplatform == b'cygwin':"
496 507 # but these translations are not supported by native
497 508 # tools, so the exec bit tends to be set erroneously.
498 509 # Therefore, disable executable bit access on Cygwin.
499 def checkexec(path):
510 def checkexec(path: bytes) -> bool:
500 511 return False
501 512
502 513 # Similarly, Cygwin's symlink emulation is likely to create
503 514 # problems when Mercurial is used from both Cygwin and native
504 515 # Windows, with other native tools, or on shared volumes
505 def checklink(path):
516 def checklink(path: bytes) -> bool:
506 517 return False
507 518
508 519
509 520 _needsshellquote = None
510 521
511 522
512 def shellquote(s):
523 def shellquote(s: bytes) -> bytes:
513 524 if pycompat.sysplatform == b'OpenVMS':
514 525 return b'"%s"' % s
515 526 global _needsshellquote
@@ -522,7 +533,7 b' def shellquote(s):'
522 533 return b"'%s'" % s.replace(b"'", b"'\\''")
523 534
524 535
525 def shellsplit(s):
536 def shellsplit(s: bytes) -> List[bytes]:
526 537 """Parse a command string in POSIX shell way (best-effort)"""
527 538 return pycompat.shlexsplit(s, posix=True)
528 539
@@ -538,12 +549,12 b' def testpid(pid: int) -> bool:'
538 549 return inst.errno != errno.ESRCH
539 550
540 551
541 def isowner(st):
552 def isowner(st: os.stat_result) -> bool:
542 553 """Return True if the stat object st is from the current user."""
543 554 return st.st_uid == os.getuid()
544 555
545 556
546 def findexe(command):
557 def findexe(command: bytes) -> Optional[bytes]:
547 558 """Find executable for command searching like which does.
548 559 If command is a basename then PATH is searched for command.
549 560 PATH isn't searched if command is an absolute or relative path.
@@ -551,7 +562,7 b' def findexe(command):'
551 562 if pycompat.sysplatform == b'OpenVMS':
552 563 return command
553 564
554 def findexisting(executable):
565 def findexisting(executable: bytes) -> Optional[bytes]:
555 566 b'Will return executable if existing file'
556 567 if os.path.isfile(executable) and os.access(executable, os.X_OK):
557 568 return executable
@@ -577,7 +588,7 b' def setsignalhandler() -> None:'
577 588 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
578 589
579 590
580 def statfiles(files):
591 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
581 592 """Stat each file in files. Yield each stat, or None if a file does not
582 593 exist or has a type we don't care about."""
583 594 lstat = os.lstat
@@ -597,7 +608,7 b' def getuser() -> bytes:'
597 608 return pycompat.fsencode(getpass.getuser())
598 609
599 610
600 def username(uid=None):
611 def username(uid: Optional[int] = None) -> Optional[bytes]:
601 612 """Return the name of the user with the given uid.
602 613
603 614 If uid is None, return the name of the current user."""
@@ -610,7 +621,7 b' def username(uid=None):'
610 621 return b'%d' % uid
611 622
612 623
613 def groupname(gid=None):
624 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
614 625 """Return the name of the group with the given gid.
615 626
616 627 If gid is None, return the name of the current group."""
@@ -623,7 +634,7 b' def groupname(gid=None):'
623 634 return pycompat.bytestr(gid)
624 635
625 636
626 def groupmembers(name):
637 def groupmembers(name: bytes) -> List[bytes]:
627 638 """Return the list of members of the group with the given
628 639 name, KeyError if the group does not exist.
629 640 """
@@ -643,7 +654,11 b' def makedir(path: bytes, notindexed: boo'
643 654 os.mkdir(path)
644 655
645 656
646 def lookupreg(key, name=None, scope=None):
657 def lookupreg(
658 key: bytes,
659 name: Optional[bytes] = None,
660 scope: Optional[Union[int, Iterable[int]]] = None,
661 ) -> Optional[bytes]:
647 662 return None
648 663
649 664
@@ -690,14 +705,14 b' class cachestat:'
690 705 return not self == other
691 706
692 707
693 def statislink(st):
708 def statislink(st: Optional[os.stat_result]) -> bool:
694 709 '''check whether a stat result is a symlink'''
695 return st and stat.S_ISLNK(st.st_mode)
710 return stat.S_ISLNK(st.st_mode) if st else False
696 711
697 712
698 def statisexec(st):
713 def statisexec(st: Optional[os.stat_result]) -> bool:
699 714 '''check whether a stat result is an executable file'''
700 return st and (st.st_mode & 0o100 != 0)
715 return (st.st_mode & 0o100 != 0) if st else False
701 716
702 717
703 718 def poll(fds):
@@ -714,7 +729,7 b' def poll(fds):'
714 729 return sorted(list(set(sum(res, []))))
715 730
716 731
717 def readpipe(pipe):
732 def readpipe(pipe) -> bytes:
718 733 """Read all available data from a pipe."""
719 734 # We can't fstat() a pipe because Linux will always report 0.
720 735 # So, we set the pipe to non-blocking mode and read everything
@@ -739,7 +754,7 b' def readpipe(pipe):'
739 754 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
740 755
741 756
742 def bindunixsocket(sock, path):
757 def bindunixsocket(sock, path: bytes) -> None:
743 758 """Bind the UNIX domain socket to the specified path"""
744 759 # use relative path instead of full path at bind() if possible, since
745 760 # AF_UNIX path has very small length limit (107 chars) on common
@@ -18,6 +18,13 b' import winreg # pytype: disable=import-'
18 18
19 19 from typing import (
20 20 BinaryIO,
21 Iterable,
22 Iterator,
23 List,
24 NoReturn,
25 Optional,
26 Sequence,
27 Union,
21 28 )
22 29
23 30 from .i18n import _
@@ -183,7 +190,7 b" def posixfile(name, mode=b'r', buffering"
183 190 listdir = osutil.listdir
184 191
185 192
186 def get_password():
193 def get_password() -> bytes:
187 194 """Prompt for password with echo off, using Windows getch().
188 195
189 196 This shouldn't be called directly- use ``ui.getpass()`` instead, which
@@ -244,11 +251,11 b' class winstdout(typelib.BinaryIO_Proxy):'
244 251 raise IOError(errno.EPIPE, 'Broken pipe')
245 252
246 253
247 def openhardlinks():
254 def openhardlinks() -> bool:
248 255 return True
249 256
250 257
251 def parsepatchoutput(output_line):
258 def parsepatchoutput(output_line: bytes) -> bytes:
252 259 """parses the output produced by patch and returns the filename"""
253 260 pf = output_line[14:]
254 261 if pf[0] == b'`':
@@ -256,7 +263,9 b' def parsepatchoutput(output_line):'
256 263 return pf
257 264
258 265
259 def sshargs(sshcmd, host, user, port):
266 def sshargs(
267 sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
268 ) -> bytes:
260 269 '''Build argument list for ssh or Plink'''
261 270 pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p'
262 271 args = user and (b"%s@%s" % (user, host)) or host
@@ -271,23 +280,28 b' def sshargs(sshcmd, host, user, port):'
271 280 return args
272 281
273 282
274 def setflags(f, l, x):
275 pass
276
277
278 def copymode(src, dst, mode=None, enforcewritable=False):
283 def setflags(f: bytes, l: bool, x: bool) -> None:
279 284 pass
280 285
281 286
282 def checkexec(path):
287 def copymode(
288 src: bytes,
289 dst: bytes,
290 mode: Optional[bytes] = None,
291 enforcewritable: bool = False,
292 ) -> None:
293 pass
294
295
296 def checkexec(path: bytes) -> bool:
283 297 return False
284 298
285 299
286 def checklink(path):
300 def checklink(path: bytes) -> bool:
287 301 return False
288 302
289 303
290 def setbinary(fd):
304 def setbinary(fd) -> None:
291 305 # When run without console, pipes may expose invalid
292 306 # fileno(), usually set to -1.
293 307 fno = getattr(fd, 'fileno', None)
@@ -295,11 +309,11 b' def setbinary(fd):'
295 309 msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr
296 310
297 311
298 def pconvert(path):
312 def pconvert(path: bytes) -> bytes:
299 313 return path.replace(pycompat.ossep, b'/')
300 314
301 315
302 def localpath(path):
316 def localpath(path: bytes) -> bytes:
303 317 return path.replace(b'/', b'\\')
304 318
305 319
@@ -307,7 +321,7 b' def normpath(path):'
307 321 return pconvert(os.path.normpath(path))
308 322
309 323
310 def normcase(path):
324 def normcase(path: bytes) -> bytes:
311 325 return encoding.upper(path) # NTFS compares via upper()
312 326
313 327
@@ -468,7 +482,7 b' def shelltocmdexe(path, env):'
468 482 _needsshellquote = None
469 483
470 484
471 def shellquote(s):
485 def shellquote(s: bytes) -> bytes:
472 486 r"""
473 487 >>> shellquote(br'C:\Users\xyz')
474 488 '"C:\\Users\\xyz"'
@@ -504,18 +518,18 b' def _unquote(s):'
504 518 return s
505 519
506 520
507 def shellsplit(s):
521 def shellsplit(s: bytes) -> List[bytes]:
508 522 """Parse a command string in cmd.exe way (best-effort)"""
509 523 return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False))
510 524
511 525
512 526 # if you change this stub into a real check, please try to implement the
513 527 # username and groupname functions above, too.
514 def isowner(st):
528 def isowner(st: os.stat_result) -> bool:
515 529 return True
516 530
517 531
518 def findexe(command):
532 def findexe(command: bytes) -> Optional[bytes]:
519 533 """Find executable for command searching like cmd.exe does.
520 534 If command is a basename then PATH is searched for command.
521 535 PATH isn't searched if command is an absolute or relative path.
@@ -526,7 +540,7 b' def findexe(command):'
526 540 if os.path.splitext(command)[1].lower() in pathexts:
527 541 pathexts = [b'']
528 542
529 def findexisting(pathcommand):
543 def findexisting(pathcommand: bytes) -> Optional[bytes]:
530 544 """Will append extension (if needed) and return existing file"""
531 545 for ext in pathexts:
532 546 executable = pathcommand + ext
@@ -547,7 +561,7 b' def findexe(command):'
547 561 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
548 562
549 563
550 def statfiles(files):
564 def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
551 565 """Stat each file in files. Yield each stat, or None if a file
552 566 does not exist or has a type we don't care about.
553 567
@@ -573,7 +587,7 b' def statfiles(files):'
573 587 yield cache.get(base, None)
574 588
575 589
576 def username(uid=None):
590 def username(uid: Optional[int] = None) -> Optional[bytes]:
577 591 """Return the name of the user with the given uid.
578 592
579 593 If uid is None, return the name of the current user."""
@@ -588,7 +602,7 b' def username(uid=None):'
588 602 return None
589 603
590 604
591 def groupname(gid=None):
605 def groupname(gid: Optional[int] = None) -> Optional[bytes]:
592 606 """Return the name of the group with the given gid.
593 607
594 608 If gid is None, return the name of the current group."""
@@ -640,12 +654,12 b' def gethgcmd():'
640 654 return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]]
641 655
642 656
643 def groupmembers(name):
657 def groupmembers(name: bytes) -> List[bytes]:
644 658 # Don't support groups on Windows for now
645 659 raise KeyError
646 660
647 661
648 def isexec(f):
662 def isexec(f: bytes) -> bool:
649 663 return False
650 664
651 665
@@ -657,7 +671,11 b' class cachestat:'
657 671 return False
658 672
659 673
660 def lookupreg(key, valname=None, scope=None):
674 def lookupreg(
675 key: bytes,
676 valname: Optional[bytes] = None,
677 scope: Optional[Union[int, Iterable[int]]] = None,
678 ) -> Optional[bytes]:
661 679 """Look up a key/value name in the Windows registry.
662 680
663 681 valname: value name. If unspecified, the default value for the key
@@ -693,12 +711,12 b' def lookupreg(key, valname=None, scope=N'
693 711 expandglobs = True
694 712
695 713
696 def statislink(st):
714 def statislink(st: Optional[os.stat_result]) -> bool:
697 715 '''check whether a stat result is a symlink'''
698 716 return False
699 717
700 718
701 def statisexec(st):
719 def statisexec(st: Optional[os.stat_result]) -> bool:
702 720 '''check whether a stat result is an executable file'''
703 721 return False
704 722
@@ -708,7 +726,7 b' def poll(fds):'
708 726 raise NotImplementedError()
709 727
710 728
711 def readpipe(pipe):
729 def readpipe(pipe) -> bytes:
712 730 """Read all available data from a pipe."""
713 731 chunks = []
714 732 while True:
@@ -724,5 +742,5 b' def readpipe(pipe):'
724 742 return b''.join(chunks)
725 743
726 744
727 def bindunixsocket(sock, path):
745 def bindunixsocket(sock, path: bytes) -> NoReturn:
728 746 raise NotImplementedError('unsupported platform')
General Comments 0
You need to be logged in to leave comments. Login now