# HG changeset patch # User Matt Harbison # Date 2022-12-16 05:54:39 # Node ID 58dff81ffba126eb20939de74afed19e59122877 # Parent 0a91aba258e0a597d6f6c9ca1dafe58847e4e877 typing: add type hints to the common posix/windows platform functions These are done in sync because some platforms have empty implementations, and it isn't obvious what the types should be without examining the other. We want the types aligned, so @overload definitions that differ aren't generated. The only differences here are the few methods that unconditionally raise an error are marked as `NoReturn`, which doesn't seem to bother pytype. A couple of the posix module functions needed to be updated with a modern ternary operator, because pytype seems to want to use the type of the second object in the old `return x and y` style. diff --git a/mercurial/posix.py b/mercurial/posix.py --- a/mercurial/posix.py +++ b/mercurial/posix.py @@ -20,9 +20,13 @@ import tempfile import unicodedata from typing import ( + Iterable, + Iterator, List, NoReturn, Optional, + Sequence, + Union, ) from .i18n import _ @@ -91,7 +95,7 @@ def split(p): return ht[0] + b'/', ht[1] -def openhardlinks(): +def openhardlinks() -> bool: '''return true if it is safe to hold open file handles to hardlinks''' return True @@ -101,7 +105,7 @@ def nlinks(name: bytes) -> int: return os.lstat(name).st_nlink -def parsepatchoutput(output_line): +def parsepatchoutput(output_line: bytes) -> bytes: """parses the output produced by patch and returns the filename""" pf = output_line[14:] if pycompat.sysplatform == b'OpenVMS': @@ -113,7 +117,9 @@ def parsepatchoutput(output_line): return pf -def sshargs(sshcmd, host, user, port): +def sshargs( + sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] +) -> bytes: '''Build argument list for ssh''' args = user and (b"%s@%s" % (user, host)) or host if b'-' in args[:1]: @@ -126,12 +132,12 @@ def sshargs(sshcmd, host, user, port): return args -def isexec(f): +def isexec(f: bytes) -> bool: """check whether a file is executable""" return os.lstat(f).st_mode & 0o100 != 0 -def setflags(f, l, x): +def setflags(f: bytes, l: bool, x: bool) -> None: st = os.lstat(f) s = st.st_mode if l: @@ -175,7 +181,12 @@ def setflags(f, l, x): os.chmod(f, s & 0o666) -def copymode(src, dst, mode=None, enforcewritable=False): +def copymode( + src: bytes, + dst: bytes, + mode: Optional[bytes] = None, + enforcewritable: bool = False, +) -> None: """Copy the file mode from the file at path src to dst. If src doesn't exist, we're using mode instead. If mode is None, we're using umask.""" @@ -195,7 +206,7 @@ def copymode(src, dst, mode=None, enforc os.chmod(dst, new_mode) -def checkexec(path): +def checkexec(path: bytes) -> bool: """ Check whether the given path is on a filesystem with UNIX-like exec flags @@ -275,7 +286,7 @@ def checkexec(path): return False -def checklink(path): +def checklink(path: bytes) -> bool: """check whether the given path is on a symlink-capable filesystem""" # mktemp is not racy because symlink creation will fail if the # file already exists @@ -362,19 +373,19 @@ def getfstype(dirpath: bytes) -> Optiona return getattr(osutil, 'getfstype', lambda x: None)(dirpath) -def get_password(): +def get_password() -> bytes: return encoding.strtolocal(getpass.getpass('')) -def setbinary(fd): +def setbinary(fd) -> None: pass -def pconvert(path): +def pconvert(path: bytes) -> bytes: return path -def localpath(path): +def localpath(path: bytes) -> bytes: return path @@ -393,7 +404,7 @@ def samedevice(fpath1: bytes, fpath2: by # os.path.normcase is a no-op, which doesn't help us on non-native filesystems -def normcase(path): +def normcase(path: bytes) -> bytes: return path.lower() @@ -404,7 +415,7 @@ normcasefallback = normcase if pycompat.isdarwin: - def normcase(path): + def normcase(path: bytes) -> bytes: """ Normalize a filename for OS X-compatible comparison: - escape-encode invalid characters @@ -429,7 +440,7 @@ if pycompat.isdarwin: normcasespec = encoding.normcasespecs.lower - def normcasefallback(path): + def normcasefallback(path: bytes) -> bytes: try: u = path.decode('utf-8') except UnicodeDecodeError: @@ -470,7 +481,7 @@ if pycompat.sysplatform == b'cygwin': ) # use upper-ing as normcase as same as NTFS workaround - def normcase(path): + def normcase(path: bytes) -> bytes: pathlen = len(path) if (pathlen == 0) or (path[0] != pycompat.ossep): # treat as relative @@ -496,20 +507,20 @@ if pycompat.sysplatform == b'cygwin': # but these translations are not supported by native # tools, so the exec bit tends to be set erroneously. # Therefore, disable executable bit access on Cygwin. - def checkexec(path): + def checkexec(path: bytes) -> bool: return False # Similarly, Cygwin's symlink emulation is likely to create # problems when Mercurial is used from both Cygwin and native # Windows, with other native tools, or on shared volumes - def checklink(path): + def checklink(path: bytes) -> bool: return False _needsshellquote = None -def shellquote(s): +def shellquote(s: bytes) -> bytes: if pycompat.sysplatform == b'OpenVMS': return b'"%s"' % s global _needsshellquote @@ -522,7 +533,7 @@ def shellquote(s): return b"'%s'" % s.replace(b"'", b"'\\''") -def shellsplit(s): +def shellsplit(s: bytes) -> List[bytes]: """Parse a command string in POSIX shell way (best-effort)""" return pycompat.shlexsplit(s, posix=True) @@ -538,12 +549,12 @@ def testpid(pid: int) -> bool: return inst.errno != errno.ESRCH -def isowner(st): +def isowner(st: os.stat_result) -> bool: """Return True if the stat object st is from the current user.""" return st.st_uid == os.getuid() -def findexe(command): +def findexe(command: bytes) -> Optional[bytes]: """Find executable for command searching like which does. If command is a basename then PATH is searched for command. PATH isn't searched if command is an absolute or relative path. @@ -551,7 +562,7 @@ def findexe(command): if pycompat.sysplatform == b'OpenVMS': return command - def findexisting(executable): + def findexisting(executable: bytes) -> Optional[bytes]: b'Will return executable if existing file' if os.path.isfile(executable) and os.access(executable, os.X_OK): return executable @@ -577,7 +588,7 @@ def setsignalhandler() -> None: _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} -def statfiles(files): +def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: """Stat each file in files. Yield each stat, or None if a file does not exist or has a type we don't care about.""" lstat = os.lstat @@ -597,7 +608,7 @@ def getuser() -> bytes: return pycompat.fsencode(getpass.getuser()) -def username(uid=None): +def username(uid: Optional[int] = None) -> Optional[bytes]: """Return the name of the user with the given uid. If uid is None, return the name of the current user.""" @@ -610,7 +621,7 @@ def username(uid=None): return b'%d' % uid -def groupname(gid=None): +def groupname(gid: Optional[int] = None) -> Optional[bytes]: """Return the name of the group with the given gid. If gid is None, return the name of the current group.""" @@ -623,7 +634,7 @@ def groupname(gid=None): return pycompat.bytestr(gid) -def groupmembers(name): +def groupmembers(name: bytes) -> List[bytes]: """Return the list of members of the group with the given name, KeyError if the group does not exist. """ @@ -643,7 +654,11 @@ def makedir(path: bytes, notindexed: boo os.mkdir(path) -def lookupreg(key, name=None, scope=None): +def lookupreg( + key: bytes, + name: Optional[bytes] = None, + scope: Optional[Union[int, Iterable[int]]] = None, +) -> Optional[bytes]: return None @@ -690,14 +705,14 @@ class cachestat: return not self == other -def statislink(st): +def statislink(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is a symlink''' - return st and stat.S_ISLNK(st.st_mode) + return stat.S_ISLNK(st.st_mode) if st else False -def statisexec(st): +def statisexec(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is an executable file''' - return st and (st.st_mode & 0o100 != 0) + return (st.st_mode & 0o100 != 0) if st else False def poll(fds): @@ -714,7 +729,7 @@ def poll(fds): return sorted(list(set(sum(res, [])))) -def readpipe(pipe): +def readpipe(pipe) -> bytes: """Read all available data from a pipe.""" # We can't fstat() a pipe because Linux will always report 0. # So, we set the pipe to non-blocking mode and read everything @@ -739,7 +754,7 @@ def readpipe(pipe): fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags) -def bindunixsocket(sock, path): +def bindunixsocket(sock, path: bytes) -> None: """Bind the UNIX domain socket to the specified path""" # use relative path instead of full path at bind() if possible, since # AF_UNIX path has very small length limit (107 chars) on common diff --git a/mercurial/windows.py b/mercurial/windows.py --- a/mercurial/windows.py +++ b/mercurial/windows.py @@ -18,6 +18,13 @@ import winreg # pytype: disable=import- from typing import ( BinaryIO, + Iterable, + Iterator, + List, + NoReturn, + Optional, + Sequence, + Union, ) from .i18n import _ @@ -183,7 +190,7 @@ def posixfile(name, mode=b'r', buffering listdir = osutil.listdir -def get_password(): +def get_password() -> bytes: """Prompt for password with echo off, using Windows getch(). This shouldn't be called directly- use ``ui.getpass()`` instead, which @@ -244,11 +251,11 @@ class winstdout(typelib.BinaryIO_Proxy): raise IOError(errno.EPIPE, 'Broken pipe') -def openhardlinks(): +def openhardlinks() -> bool: return True -def parsepatchoutput(output_line): +def parsepatchoutput(output_line: bytes) -> bytes: """parses the output produced by patch and returns the filename""" pf = output_line[14:] if pf[0] == b'`': @@ -256,7 +263,9 @@ def parsepatchoutput(output_line): return pf -def sshargs(sshcmd, host, user, port): +def sshargs( + sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] +) -> bytes: '''Build argument list for ssh or Plink''' pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p' args = user and (b"%s@%s" % (user, host)) or host @@ -271,23 +280,28 @@ def sshargs(sshcmd, host, user, port): return args -def setflags(f, l, x): - pass - - -def copymode(src, dst, mode=None, enforcewritable=False): +def setflags(f: bytes, l: bool, x: bool) -> None: pass -def checkexec(path): +def copymode( + src: bytes, + dst: bytes, + mode: Optional[bytes] = None, + enforcewritable: bool = False, +) -> None: + pass + + +def checkexec(path: bytes) -> bool: return False -def checklink(path): +def checklink(path: bytes) -> bool: return False -def setbinary(fd): +def setbinary(fd) -> None: # When run without console, pipes may expose invalid # fileno(), usually set to -1. fno = getattr(fd, 'fileno', None) @@ -295,11 +309,11 @@ def setbinary(fd): msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr -def pconvert(path): +def pconvert(path: bytes) -> bytes: return path.replace(pycompat.ossep, b'/') -def localpath(path): +def localpath(path: bytes) -> bytes: return path.replace(b'/', b'\\') @@ -307,7 +321,7 @@ def normpath(path): return pconvert(os.path.normpath(path)) -def normcase(path): +def normcase(path: bytes) -> bytes: return encoding.upper(path) # NTFS compares via upper() @@ -468,7 +482,7 @@ def shelltocmdexe(path, env): _needsshellquote = None -def shellquote(s): +def shellquote(s: bytes) -> bytes: r""" >>> shellquote(br'C:\Users\xyz') '"C:\\Users\\xyz"' @@ -504,18 +518,18 @@ def _unquote(s): return s -def shellsplit(s): +def shellsplit(s: bytes) -> List[bytes]: """Parse a command string in cmd.exe way (best-effort)""" return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False)) # if you change this stub into a real check, please try to implement the # username and groupname functions above, too. -def isowner(st): +def isowner(st: os.stat_result) -> bool: return True -def findexe(command): +def findexe(command: bytes) -> Optional[bytes]: """Find executable for command searching like cmd.exe does. If command is a basename then PATH is searched for command. PATH isn't searched if command is an absolute or relative path. @@ -526,7 +540,7 @@ def findexe(command): if os.path.splitext(command)[1].lower() in pathexts: pathexts = [b''] - def findexisting(pathcommand): + def findexisting(pathcommand: bytes) -> Optional[bytes]: """Will append extension (if needed) and return existing file""" for ext in pathexts: executable = pathcommand + ext @@ -547,7 +561,7 @@ def findexe(command): _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} -def statfiles(files): +def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: """Stat each file in files. Yield each stat, or None if a file does not exist or has a type we don't care about. @@ -573,7 +587,7 @@ def statfiles(files): yield cache.get(base, None) -def username(uid=None): +def username(uid: Optional[int] = None) -> Optional[bytes]: """Return the name of the user with the given uid. If uid is None, return the name of the current user.""" @@ -588,7 +602,7 @@ def username(uid=None): return None -def groupname(gid=None): +def groupname(gid: Optional[int] = None) -> Optional[bytes]: """Return the name of the group with the given gid. If gid is None, return the name of the current group.""" @@ -640,12 +654,12 @@ def gethgcmd(): return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]] -def groupmembers(name): +def groupmembers(name: bytes) -> List[bytes]: # Don't support groups on Windows for now raise KeyError -def isexec(f): +def isexec(f: bytes) -> bool: return False @@ -657,7 +671,11 @@ class cachestat: return False -def lookupreg(key, valname=None, scope=None): +def lookupreg( + key: bytes, + valname: Optional[bytes] = None, + scope: Optional[Union[int, Iterable[int]]] = None, +) -> Optional[bytes]: """Look up a key/value name in the Windows registry. valname: value name. If unspecified, the default value for the key @@ -693,12 +711,12 @@ def lookupreg(key, valname=None, scope=N expandglobs = True -def statislink(st): +def statislink(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is a symlink''' return False -def statisexec(st): +def statisexec(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is an executable file''' return False @@ -708,7 +726,7 @@ def poll(fds): raise NotImplementedError() -def readpipe(pipe): +def readpipe(pipe) -> bytes: """Read all available data from a pipe.""" chunks = [] while True: @@ -724,5 +742,5 @@ def readpipe(pipe): return b''.join(chunks) -def bindunixsocket(sock, path): +def bindunixsocket(sock, path: bytes) -> NoReturn: raise NotImplementedError('unsupported platform')