diff --git a/mercurial/posix.py b/mercurial/posix.py --- a/mercurial/posix.py +++ b/mercurial/posix.py @@ -20,9 +20,13 @@ import tempfile import unicodedata from typing import ( + Iterable, + Iterator, List, NoReturn, Optional, + Sequence, + Union, ) from .i18n import _ @@ -91,7 +95,7 @@ def split(p): return ht[0] + b'/', ht[1] -def openhardlinks(): +def openhardlinks() -> bool: '''return true if it is safe to hold open file handles to hardlinks''' return True @@ -101,7 +105,7 @@ def nlinks(name: bytes) -> int: return os.lstat(name).st_nlink -def parsepatchoutput(output_line): +def parsepatchoutput(output_line: bytes) -> bytes: """parses the output produced by patch and returns the filename""" pf = output_line[14:] if pycompat.sysplatform == b'OpenVMS': @@ -113,7 +117,9 @@ def parsepatchoutput(output_line): return pf -def sshargs(sshcmd, host, user, port): +def sshargs( + sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] +) -> bytes: '''Build argument list for ssh''' args = user and (b"%s@%s" % (user, host)) or host if b'-' in args[:1]: @@ -126,12 +132,12 @@ def sshargs(sshcmd, host, user, port): return args -def isexec(f): +def isexec(f: bytes) -> bool: """check whether a file is executable""" return os.lstat(f).st_mode & 0o100 != 0 -def setflags(f, l, x): +def setflags(f: bytes, l: bool, x: bool) -> None: st = os.lstat(f) s = st.st_mode if l: @@ -175,7 +181,12 @@ def setflags(f, l, x): os.chmod(f, s & 0o666) -def copymode(src, dst, mode=None, enforcewritable=False): +def copymode( + src: bytes, + dst: bytes, + mode: Optional[bytes] = None, + enforcewritable: bool = False, +) -> None: """Copy the file mode from the file at path src to dst. If src doesn't exist, we're using mode instead. If mode is None, we're using umask.""" @@ -195,7 +206,7 @@ def copymode(src, dst, mode=None, enforc os.chmod(dst, new_mode) -def checkexec(path): +def checkexec(path: bytes) -> bool: """ Check whether the given path is on a filesystem with UNIX-like exec flags @@ -275,7 +286,7 @@ def checkexec(path): return False -def checklink(path): +def checklink(path: bytes) -> bool: """check whether the given path is on a symlink-capable filesystem""" # mktemp is not racy because symlink creation will fail if the # file already exists @@ -362,19 +373,19 @@ def getfstype(dirpath: bytes) -> Optiona return getattr(osutil, 'getfstype', lambda x: None)(dirpath) -def get_password(): +def get_password() -> bytes: return encoding.strtolocal(getpass.getpass('')) -def setbinary(fd): +def setbinary(fd) -> None: pass -def pconvert(path): +def pconvert(path: bytes) -> bytes: return path -def localpath(path): +def localpath(path: bytes) -> bytes: return path @@ -393,7 +404,7 @@ def samedevice(fpath1: bytes, fpath2: by # os.path.normcase is a no-op, which doesn't help us on non-native filesystems -def normcase(path): +def normcase(path: bytes) -> bytes: return path.lower() @@ -404,7 +415,7 @@ normcasefallback = normcase if pycompat.isdarwin: - def normcase(path): + def normcase(path: bytes) -> bytes: """ Normalize a filename for OS X-compatible comparison: - escape-encode invalid characters @@ -429,7 +440,7 @@ if pycompat.isdarwin: normcasespec = encoding.normcasespecs.lower - def normcasefallback(path): + def normcasefallback(path: bytes) -> bytes: try: u = path.decode('utf-8') except UnicodeDecodeError: @@ -470,7 +481,7 @@ if pycompat.sysplatform == b'cygwin': ) # use upper-ing as normcase as same as NTFS workaround - def normcase(path): + def normcase(path: bytes) -> bytes: pathlen = len(path) if (pathlen == 0) or (path[0] != pycompat.ossep): # treat as relative @@ -496,20 +507,20 @@ if pycompat.sysplatform == b'cygwin': # but these translations are not supported by native # tools, so the exec bit tends to be set erroneously. # Therefore, disable executable bit access on Cygwin. - def checkexec(path): + def checkexec(path: bytes) -> bool: return False # Similarly, Cygwin's symlink emulation is likely to create # problems when Mercurial is used from both Cygwin and native # Windows, with other native tools, or on shared volumes - def checklink(path): + def checklink(path: bytes) -> bool: return False _needsshellquote = None -def shellquote(s): +def shellquote(s: bytes) -> bytes: if pycompat.sysplatform == b'OpenVMS': return b'"%s"' % s global _needsshellquote @@ -522,7 +533,7 @@ def shellquote(s): return b"'%s'" % s.replace(b"'", b"'\\''") -def shellsplit(s): +def shellsplit(s: bytes) -> List[bytes]: """Parse a command string in POSIX shell way (best-effort)""" return pycompat.shlexsplit(s, posix=True) @@ -538,12 +549,12 @@ def testpid(pid: int) -> bool: return inst.errno != errno.ESRCH -def isowner(st): +def isowner(st: os.stat_result) -> bool: """Return True if the stat object st is from the current user.""" return st.st_uid == os.getuid() -def findexe(command): +def findexe(command: bytes) -> Optional[bytes]: """Find executable for command searching like which does. If command is a basename then PATH is searched for command. PATH isn't searched if command is an absolute or relative path. @@ -551,7 +562,7 @@ def findexe(command): if pycompat.sysplatform == b'OpenVMS': return command - def findexisting(executable): + def findexisting(executable: bytes) -> Optional[bytes]: b'Will return executable if existing file' if os.path.isfile(executable) and os.access(executable, os.X_OK): return executable @@ -577,7 +588,7 @@ def setsignalhandler() -> None: _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} -def statfiles(files): +def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: """Stat each file in files. Yield each stat, or None if a file does not exist or has a type we don't care about.""" lstat = os.lstat @@ -597,7 +608,7 @@ def getuser() -> bytes: return pycompat.fsencode(getpass.getuser()) -def username(uid=None): +def username(uid: Optional[int] = None) -> Optional[bytes]: """Return the name of the user with the given uid. If uid is None, return the name of the current user.""" @@ -610,7 +621,7 @@ def username(uid=None): return b'%d' % uid -def groupname(gid=None): +def groupname(gid: Optional[int] = None) -> Optional[bytes]: """Return the name of the group with the given gid. If gid is None, return the name of the current group.""" @@ -623,7 +634,7 @@ def groupname(gid=None): return pycompat.bytestr(gid) -def groupmembers(name): +def groupmembers(name: bytes) -> List[bytes]: """Return the list of members of the group with the given name, KeyError if the group does not exist. """ @@ -643,7 +654,11 @@ def makedir(path: bytes, notindexed: boo os.mkdir(path) -def lookupreg(key, name=None, scope=None): +def lookupreg( + key: bytes, + name: Optional[bytes] = None, + scope: Optional[Union[int, Iterable[int]]] = None, +) -> Optional[bytes]: return None @@ -690,14 +705,14 @@ class cachestat: return not self == other -def statislink(st): +def statislink(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is a symlink''' - return st and stat.S_ISLNK(st.st_mode) + return stat.S_ISLNK(st.st_mode) if st else False -def statisexec(st): +def statisexec(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is an executable file''' - return st and (st.st_mode & 0o100 != 0) + return (st.st_mode & 0o100 != 0) if st else False def poll(fds): @@ -714,7 +729,7 @@ def poll(fds): return sorted(list(set(sum(res, [])))) -def readpipe(pipe): +def readpipe(pipe) -> bytes: """Read all available data from a pipe.""" # We can't fstat() a pipe because Linux will always report 0. # So, we set the pipe to non-blocking mode and read everything @@ -739,7 +754,7 @@ def readpipe(pipe): fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags) -def bindunixsocket(sock, path): +def bindunixsocket(sock, path: bytes) -> None: """Bind the UNIX domain socket to the specified path""" # use relative path instead of full path at bind() if possible, since # AF_UNIX path has very small length limit (107 chars) on common diff --git a/mercurial/windows.py b/mercurial/windows.py --- a/mercurial/windows.py +++ b/mercurial/windows.py @@ -18,6 +18,13 @@ import winreg # pytype: disable=import- from typing import ( BinaryIO, + Iterable, + Iterator, + List, + NoReturn, + Optional, + Sequence, + Union, ) from .i18n import _ @@ -183,7 +190,7 @@ def posixfile(name, mode=b'r', buffering listdir = osutil.listdir -def get_password(): +def get_password() -> bytes: """Prompt for password with echo off, using Windows getch(). This shouldn't be called directly- use ``ui.getpass()`` instead, which @@ -244,11 +251,11 @@ class winstdout(typelib.BinaryIO_Proxy): raise IOError(errno.EPIPE, 'Broken pipe') -def openhardlinks(): +def openhardlinks() -> bool: return True -def parsepatchoutput(output_line): +def parsepatchoutput(output_line: bytes) -> bytes: """parses the output produced by patch and returns the filename""" pf = output_line[14:] if pf[0] == b'`': @@ -256,7 +263,9 @@ def parsepatchoutput(output_line): return pf -def sshargs(sshcmd, host, user, port): +def sshargs( + sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes] +) -> bytes: '''Build argument list for ssh or Plink''' pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p' args = user and (b"%s@%s" % (user, host)) or host @@ -271,23 +280,28 @@ def sshargs(sshcmd, host, user, port): return args -def setflags(f, l, x): - pass - - -def copymode(src, dst, mode=None, enforcewritable=False): +def setflags(f: bytes, l: bool, x: bool) -> None: pass -def checkexec(path): +def copymode( + src: bytes, + dst: bytes, + mode: Optional[bytes] = None, + enforcewritable: bool = False, +) -> None: + pass + + +def checkexec(path: bytes) -> bool: return False -def checklink(path): +def checklink(path: bytes) -> bool: return False -def setbinary(fd): +def setbinary(fd) -> None: # When run without console, pipes may expose invalid # fileno(), usually set to -1. fno = getattr(fd, 'fileno', None) @@ -295,11 +309,11 @@ def setbinary(fd): msvcrt.setmode(fno(), os.O_BINARY) # pytype: disable=module-attr -def pconvert(path): +def pconvert(path: bytes) -> bytes: return path.replace(pycompat.ossep, b'/') -def localpath(path): +def localpath(path: bytes) -> bytes: return path.replace(b'/', b'\\') @@ -307,7 +321,7 @@ def normpath(path): return pconvert(os.path.normpath(path)) -def normcase(path): +def normcase(path: bytes) -> bytes: return encoding.upper(path) # NTFS compares via upper() @@ -468,7 +482,7 @@ def shelltocmdexe(path, env): _needsshellquote = None -def shellquote(s): +def shellquote(s: bytes) -> bytes: r""" >>> shellquote(br'C:\Users\xyz') '"C:\\Users\\xyz"' @@ -504,18 +518,18 @@ def _unquote(s): return s -def shellsplit(s): +def shellsplit(s: bytes) -> List[bytes]: """Parse a command string in cmd.exe way (best-effort)""" return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False)) # if you change this stub into a real check, please try to implement the # username and groupname functions above, too. -def isowner(st): +def isowner(st: os.stat_result) -> bool: return True -def findexe(command): +def findexe(command: bytes) -> Optional[bytes]: """Find executable for command searching like cmd.exe does. If command is a basename then PATH is searched for command. PATH isn't searched if command is an absolute or relative path. @@ -526,7 +540,7 @@ def findexe(command): if os.path.splitext(command)[1].lower() in pathexts: pathexts = [b''] - def findexisting(pathcommand): + def findexisting(pathcommand: bytes) -> Optional[bytes]: """Will append extension (if needed) and return existing file""" for ext in pathexts: executable = pathcommand + ext @@ -547,7 +561,7 @@ def findexe(command): _wantedkinds = {stat.S_IFREG, stat.S_IFLNK} -def statfiles(files): +def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]: """Stat each file in files. Yield each stat, or None if a file does not exist or has a type we don't care about. @@ -573,7 +587,7 @@ def statfiles(files): yield cache.get(base, None) -def username(uid=None): +def username(uid: Optional[int] = None) -> Optional[bytes]: """Return the name of the user with the given uid. If uid is None, return the name of the current user.""" @@ -588,7 +602,7 @@ def username(uid=None): return None -def groupname(gid=None): +def groupname(gid: Optional[int] = None) -> Optional[bytes]: """Return the name of the group with the given gid. If gid is None, return the name of the current group.""" @@ -640,12 +654,12 @@ def gethgcmd(): return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]] -def groupmembers(name): +def groupmembers(name: bytes) -> List[bytes]: # Don't support groups on Windows for now raise KeyError -def isexec(f): +def isexec(f: bytes) -> bool: return False @@ -657,7 +671,11 @@ class cachestat: return False -def lookupreg(key, valname=None, scope=None): +def lookupreg( + key: bytes, + valname: Optional[bytes] = None, + scope: Optional[Union[int, Iterable[int]]] = None, +) -> Optional[bytes]: """Look up a key/value name in the Windows registry. valname: value name. If unspecified, the default value for the key @@ -693,12 +711,12 @@ def lookupreg(key, valname=None, scope=N expandglobs = True -def statislink(st): +def statislink(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is a symlink''' return False -def statisexec(st): +def statisexec(st: Optional[os.stat_result]) -> bool: '''check whether a stat result is an executable file''' return False @@ -708,7 +726,7 @@ def poll(fds): raise NotImplementedError() -def readpipe(pipe): +def readpipe(pipe) -> bytes: """Read all available data from a pipe.""" chunks = [] while True: @@ -724,5 +742,5 @@ def readpipe(pipe): return b''.join(chunks) -def bindunixsocket(sock, path): +def bindunixsocket(sock, path: bytes) -> NoReturn: raise NotImplementedError('unsupported platform')