diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -11,6 +11,10 @@ import shutil import stat import threading +from typing import ( + Optional, +) + from .i18n import _ from .pycompat import ( delattr, @@ -26,7 +30,7 @@ from . import ( ) -def _avoidambig(path, oldstat): +def _avoidambig(path: bytes, oldstat): """Avoid file stat ambiguity forcibly This function causes copying ``path`` file, if it is owned by @@ -60,16 +64,17 @@ class abstractvfs: '''Prevent instantiation; don't call this from subclasses.''' raise NotImplementedError('attempted instantiating ' + str(type(self))) - def __call__(self, path, mode=b'rb', **kwargs): + # TODO: type return, which is util.posixfile wrapped by a proxy + def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs): raise NotImplementedError - def _auditpath(self, path, mode): + def _auditpath(self, path: bytes, mode: bytes): raise NotImplementedError - def join(self, path, *insidef): + def join(self, path: Optional[bytes], *insidef: bytes) -> bytes: raise NotImplementedError - def tryread(self, path): + def tryread(self, path: bytes) -> bytes: '''gracefully return an empty string for missing files''' try: return self.read(path) @@ -77,7 +82,7 @@ class abstractvfs: pass return b"" - def tryreadlines(self, path, mode=b'rb'): + def tryreadlines(self, path: bytes, mode: bytes = b'rb'): '''gracefully return an empty array for missing files''' try: return self.readlines(path, mode=mode) @@ -95,57 +100,61 @@ class abstractvfs: """ return self.__call__ - def read(self, path): + def read(self, path: bytes) -> bytes: with self(path, b'rb') as fp: return fp.read() - def readlines(self, path, mode=b'rb'): + def readlines(self, path: bytes, mode: bytes = b'rb'): with self(path, mode=mode) as fp: return fp.readlines() - def write(self, path, data, backgroundclose=False, **kwargs): + def write( + self, path: bytes, data: bytes, backgroundclose=False, **kwargs + ) -> int: with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp: return fp.write(data) - def writelines(self, path, data, mode=b'wb', notindexed=False): + def writelines( + self, path: bytes, data: bytes, mode: bytes = b'wb', notindexed=False + ) -> None: with self(path, mode=mode, notindexed=notindexed) as fp: return fp.writelines(data) - def append(self, path, data): + def append(self, path: bytes, data: bytes) -> int: with self(path, b'ab') as fp: return fp.write(data) - def basename(self, path): + def basename(self, path: bytes) -> bytes: """return base element of a path (as os.path.basename would do) This exists to allow handling of strange encoding if needed.""" return os.path.basename(path) - def chmod(self, path, mode): + def chmod(self, path: bytes, mode: int) -> None: return os.chmod(self.join(path), mode) - def dirname(self, path): + def dirname(self, path: bytes) -> bytes: """return dirname element of a path (as os.path.dirname would do) This exists to allow handling of strange encoding if needed.""" return os.path.dirname(path) - def exists(self, path=None): + def exists(self, path: Optional[bytes] = None) -> bool: return os.path.exists(self.join(path)) def fstat(self, fp): return util.fstat(fp) - def isdir(self, path=None): + def isdir(self, path: Optional[bytes] = None) -> bool: return os.path.isdir(self.join(path)) - def isfile(self, path=None): + def isfile(self, path: Optional[bytes] = None) -> bool: return os.path.isfile(self.join(path)) - def islink(self, path=None): + def islink(self, path: Optional[bytes] = None) -> bool: return os.path.islink(self.join(path)) - def isfileorlink(self, path=None): + def isfileorlink(self, path: Optional[bytes] = None) -> bool: """return whether path is a regular file or a symlink Unlike isfile, this doesn't follow symlinks.""" @@ -156,7 +165,7 @@ class abstractvfs: mode = st.st_mode return stat.S_ISREG(mode) or stat.S_ISLNK(mode) - def _join(self, *paths): + def _join(self, *paths: bytes) -> bytes: root_idx = 0 for idx, p in enumerate(paths): if os.path.isabs(p) or p.startswith(self._dir_sep): @@ -166,41 +175,48 @@ class abstractvfs: paths = [p for p in paths if p] return self._dir_sep.join(paths) - def reljoin(self, *paths): + def reljoin(self, *paths: bytes) -> bytes: """join various elements of a path together (as os.path.join would do) The vfs base is not injected so that path stay relative. This exists to allow handling of strange encoding if needed.""" return self._join(*paths) - def split(self, path): + def split(self, path: bytes): """split top-most element of a path (as os.path.split would do) This exists to allow handling of strange encoding if needed.""" return os.path.split(path) - def lexists(self, path=None): + def lexists(self, path: Optional[bytes] = None) -> bool: return os.path.lexists(self.join(path)) - def lstat(self, path=None): + def lstat(self, path: Optional[bytes] = None): return os.lstat(self.join(path)) - def listdir(self, path=None): + def listdir(self, path: Optional[bytes] = None): return os.listdir(self.join(path)) - def makedir(self, path=None, notindexed=True): + def makedir(self, path: Optional[bytes] = None, notindexed=True): return util.makedir(self.join(path), notindexed) - def makedirs(self, path=None, mode=None): + def makedirs( + self, path: Optional[bytes] = None, mode: Optional[int] = None + ): return util.makedirs(self.join(path), mode) - def makelock(self, info, path): + def makelock(self, info, path: bytes): return util.makelock(info, self.join(path)) - def mkdir(self, path=None): + def mkdir(self, path: Optional[bytes] = None): return os.mkdir(self.join(path)) - def mkstemp(self, suffix=b'', prefix=b'tmp', dir=None): + def mkstemp( + self, + suffix: bytes = b'', + prefix: bytes = b'tmp', + dir: Optional[bytes] = None, + ): fd, name = pycompat.mkstemp( suffix=suffix, prefix=prefix, dir=self.join(dir) ) @@ -210,13 +226,13 @@ class abstractvfs: else: return fd, fname - def readdir(self, path=None, stat=None, skip=None): + def readdir(self, path: Optional[bytes] = None, stat=None, skip=None): return util.listdir(self.join(path), stat, skip) - def readlock(self, path): + def readlock(self, path: bytes) -> bytes: return util.readlock(self.join(path)) - def rename(self, src, dst, checkambig=False): + def rename(self, src: bytes, dst: bytes, checkambig=False): """Rename from src to dst checkambig argument is used with util.filestat, and is useful @@ -238,18 +254,20 @@ class abstractvfs: return ret return util.rename(srcpath, dstpath) - def readlink(self, path): + def readlink(self, path: bytes) -> bytes: return util.readlink(self.join(path)) - def removedirs(self, path=None): + def removedirs(self, path: Optional[bytes] = None): """Remove a leaf directory and all empty intermediate ones""" return util.removedirs(self.join(path)) - def rmdir(self, path=None): + def rmdir(self, path: Optional[bytes] = None): """Remove an empty directory.""" return os.rmdir(self.join(path)) - def rmtree(self, path=None, ignore_errors=False, forcibly=False): + def rmtree( + self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False + ): """Remove a directory tree recursively If ``forcibly``, this tries to remove READ-ONLY files, too. @@ -272,28 +290,30 @@ class abstractvfs: self.join(path), ignore_errors=ignore_errors, onerror=onerror ) - def setflags(self, path, l, x): + def setflags(self, path: bytes, l: bool, x: bool): return util.setflags(self.join(path), l, x) - def stat(self, path=None): + def stat(self, path: Optional[bytes] = None): return os.stat(self.join(path)) - def unlink(self, path=None): + def unlink(self, path: Optional[bytes] = None): return util.unlink(self.join(path)) - def tryunlink(self, path=None): + def tryunlink(self, path: Optional[bytes] = None): """Attempt to remove a file, ignoring missing file errors.""" util.tryunlink(self.join(path)) - def unlinkpath(self, path=None, ignoremissing=False, rmdir=True): + def unlinkpath( + self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True + ): return util.unlinkpath( self.join(path), ignoremissing=ignoremissing, rmdir=rmdir ) - def utime(self, path=None, t=None): + def utime(self, path: Optional[bytes] = None, t=None): return os.utime(self.join(path), t) - def walk(self, path=None, onerror=None): + def walk(self, path: Optional[bytes] = None, onerror=None): """Yield (dirpath, dirs, files) tuple for each directories under path ``dirpath`` is relative one from the root of this vfs. This @@ -360,7 +380,7 @@ class vfs(abstractvfs): def __init__( self, - base, + base: bytes, audit=True, cacheaudited=False, expandpath=False, @@ -381,7 +401,7 @@ class vfs(abstractvfs): self.options = {} @util.propertycache - def _cansymlink(self): + def _cansymlink(self) -> bool: return util.checklink(self.base) @util.propertycache @@ -393,7 +413,7 @@ class vfs(abstractvfs): return os.chmod(name, self.createmode & 0o666) - def _auditpath(self, path, mode): + def _auditpath(self, path, mode) -> None: if self._audit: if os.path.isabs(path) and path.startswith(self.base): path = os.path.relpath(path, self.base) @@ -404,8 +424,8 @@ class vfs(abstractvfs): def __call__( self, - path, - mode=b"r", + path: bytes, + mode: bytes = b"r", atomictemp=False, notindexed=False, backgroundclose=False, @@ -518,7 +538,7 @@ class vfs(abstractvfs): return fp - def symlink(self, src, dst): + def symlink(self, src: bytes, dst: bytes) -> None: self.audit(dst) linkname = self.join(dst) util.tryunlink(linkname) @@ -538,7 +558,7 @@ class vfs(abstractvfs): else: self.write(dst, src) - def join(self, path, *insidef): + def join(self, path: Optional[bytes], *insidef: bytes) -> bytes: if path: parts = [self.base, path] parts.extend(insidef) @@ -551,7 +571,7 @@ opener = vfs class proxyvfs(abstractvfs): - def __init__(self, vfs): + def __init__(self, vfs: "vfs"): self.vfs = vfs def _auditpath(self, path, mode): @@ -569,14 +589,14 @@ class proxyvfs(abstractvfs): class filtervfs(proxyvfs, abstractvfs): '''Wrapper vfs for filtering filenames with a function.''' - def __init__(self, vfs, filter): + def __init__(self, vfs: "vfs", filter): proxyvfs.__init__(self, vfs) self._filter = filter - def __call__(self, path, *args, **kwargs): + def __call__(self, path: bytes, *args, **kwargs): return self.vfs(self._filter(path), *args, **kwargs) - def join(self, path, *insidef): + def join(self, path: Optional[bytes], *insidef: bytes) -> bytes: if path: return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef))) else: @@ -589,15 +609,15 @@ filteropener = filtervfs class readonlyvfs(proxyvfs): '''Wrapper vfs preventing any writing.''' - def __init__(self, vfs): + def __init__(self, vfs: "vfs"): proxyvfs.__init__(self, vfs) - def __call__(self, path, mode=b'r', *args, **kw): + def __call__(self, path: bytes, mode: bytes = b'r', *args, **kw): if mode not in (b'r', b'rb'): raise error.Abort(_(b'this vfs is read only')) return self.vfs(path, mode, *args, **kw) - def join(self, path, *insidef): + def join(self, path: Optional[bytes], *insidef: bytes) -> bytes: return self.vfs.join(path, *insidef)