diff --git a/mercurial/vfs.py b/mercurial/vfs.py --- a/mercurial/vfs.py +++ b/mercurial/vfs.py @@ -17,9 +17,12 @@ import typing from typing import ( Any, + BinaryIO, + Callable, Iterable, Iterator, List, + MutableMapping, Optional, Tuple, Type, @@ -36,13 +39,18 @@ from . import ( ) if typing.TYPE_CHECKING: + from . import ( + ui as uimod, + ) + _Tbackgroundfilecloser = TypeVar( '_Tbackgroundfilecloser', bound='backgroundfilecloser' ) _Tclosewrapbase = TypeVar('_Tclosewrapbase', bound='closewrapbase') + _OnErrorFn = Callable[[Exception], Optional[object]] -def _avoidambig(path: bytes, oldstat) -> None: +def _avoidambig(path: bytes, oldstat: util.filestat) -> None: """Avoid file stat ambiguity forcibly This function causes copying ``path`` file, if it is owned by @@ -78,7 +86,7 @@ class abstractvfs(abc.ABC): ... @abc.abstractmethod - def _auditpath(self, path: bytes, mode: bytes) -> Any: + def _auditpath(self, path: bytes, mode: bytes) -> None: ... @abc.abstractmethod @@ -93,7 +101,7 @@ class abstractvfs(abc.ABC): pass return b"" - def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> Any: + def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]: '''gracefully return an empty array for missing files''' try: return self.readlines(path, mode=mode) @@ -115,12 +123,12 @@ class abstractvfs(abc.ABC): with self(path, b'rb') as fp: return fp.read() - def readlines(self, path: bytes, mode: bytes = b'rb') -> Any: + def readlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]: with self(path, mode=mode) as fp: return fp.readlines() def write( - self, path: bytes, data: bytes, backgroundclose=False, **kwargs + self, path: bytes, data: bytes, backgroundclose: bool = False, **kwargs ) -> int: with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp: return fp.write(data) @@ -130,7 +138,7 @@ class abstractvfs(abc.ABC): path: bytes, data: Iterable[bytes], mode: bytes = b'wb', - notindexed=False, + notindexed: bool = False, ) -> None: with self(path, mode=mode, notindexed=notindexed) as fp: return fp.writelines(data) @@ -157,7 +165,7 @@ class abstractvfs(abc.ABC): def exists(self, path: Optional[bytes] = None) -> bool: return os.path.exists(self.join(path)) - def fstat(self, fp) -> os.stat_result: + def fstat(self, fp: BinaryIO) -> os.stat_result: return util.fstat(fp) def isdir(self, path: Optional[bytes] = None) -> bool: @@ -249,7 +257,7 @@ class abstractvfs(abc.ABC): ) -> None: return util.makedirs(self.join(path), mode) - def makelock(self, info, path: bytes) -> None: + def makelock(self, info: bytes, path: bytes) -> None: return util.makelock(info, self.join(path)) def mkdir(self, path: Optional[bytes] = None) -> None: @@ -270,6 +278,12 @@ class abstractvfs(abc.ABC): else: return fd, fname + # TODO: This doesn't match osutil.listdir(). stat=False in pure; + # non-optional bool in cext. 'skip' is bool if we trust cext, or bytes + # going by how pure uses it. Also, cext returns a custom stat structure. + # from cext.osutil.pyi: + # + # path: bytes, st: bool, skip: Optional[bool] def readdir( self, path: Optional[bytes] = None, stat=None, skip=None ) -> Any: @@ -278,7 +292,7 @@ class abstractvfs(abc.ABC): def readlock(self, path: bytes) -> bytes: return util.readlock(self.join(path)) - def rename(self, src: bytes, dst: bytes, checkambig=False) -> None: + def rename(self, src: bytes, dst: bytes, checkambig: bool = False) -> None: """Rename from src to dst checkambig argument is used with util.filestat, and is useful @@ -312,7 +326,10 @@ class abstractvfs(abc.ABC): return os.rmdir(self.join(path)) def rmtree( - self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False + self, + path: Optional[bytes] = None, + ignore_errors: bool = False, + forcibly: bool = False, ) -> None: """Remove a directory tree recursively @@ -320,7 +337,7 @@ class abstractvfs(abc.ABC): """ if forcibly: - def onexc(function, path, excinfo): + def onexc(function, path: bytes, excinfo): if function is not os.remove: raise # read-only files cannot be unlinked under Windows @@ -357,17 +374,23 @@ class abstractvfs(abc.ABC): return util.tryunlink(self.join(path)) def unlinkpath( - self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True + self, + path: Optional[bytes] = None, + ignoremissing: bool = False, + rmdir: bool = True, ) -> None: return util.unlinkpath( self.join(path), ignoremissing=ignoremissing, rmdir=rmdir ) - def utime(self, path: Optional[bytes] = None, t=None) -> None: + # TODO: could be Tuple[float, float] too. + def utime( + self, path: Optional[bytes] = None, t: Optional[Tuple[int, int]] = None + ) -> None: return os.utime(self.join(path), t) def walk( - self, path: Optional[bytes] = None, onerror=None + self, path: Optional[bytes] = None, onerror: Optional[_OnErrorFn] = None ) -> Iterator[Tuple[bytes, List[bytes], List[bytes]]]: """Yield (dirpath, dirs, files) tuple for each directory under path @@ -386,7 +409,7 @@ class abstractvfs(abc.ABC): @contextlib.contextmanager def backgroundclosing( - self, ui, expectedcount=-1 + self, ui: uimod.ui, expectedcount: int = -1 ) -> Iterator[Optional[backgroundfilecloser]]: """Allow files to be closed asynchronously. @@ -417,7 +440,7 @@ class abstractvfs(abc.ABC): None # pytype: disable=attribute-error ) - def register_file(self, path) -> None: + def register_file(self, path: bytes) -> None: """generic hook point to lets fncache steer its stew""" @@ -432,13 +455,15 @@ class vfs(abstractvfs): See pathutil.pathauditor() for details. """ + createmode: Optional[int] + def __init__( self, base: bytes, - audit=True, - cacheaudited=False, - expandpath=False, - realpath=False, + audit: bool = True, + cacheaudited: bool = False, + expandpath: bool = False, + realpath: bool = False, ) -> None: if expandpath: base = util.expandpath(base) @@ -459,15 +484,15 @@ class vfs(abstractvfs): return util.checklink(self.base) @util.propertycache - def _chmod(self): + def _chmod(self) -> bool: return util.checkexec(self.base) - def _fixfilemode(self, name) -> None: + def _fixfilemode(self, name: bytes) -> None: if self.createmode is None or not self._chmod: return os.chmod(name, self.createmode & 0o666) - def _auditpath(self, path, mode) -> None: + def _auditpath(self, path: bytes, mode: bytes) -> None: if self._audit: if os.path.isabs(path) and path.startswith(self.base): path = os.path.relpath(path, self.base) @@ -477,7 +502,9 @@ class vfs(abstractvfs): self.audit(path, mode=mode) def isfileorlink_checkdir( - self, dircache, path: Optional[bytes] = None + self, + dircache: MutableMapping[bytes, bool], + path: Optional[bytes] = None, ) -> bool: """return True if the path is a regular file or a symlink and the directories along the path are "normal", that is @@ -486,6 +513,8 @@ class vfs(abstractvfs): Ignores the `_audit` setting, and checks the directories regardless. `dircache` is used to cache the directory checks. """ + # TODO: Should be a None check on 'path', or shouldn't default to None + # because of the immediate call to util.localpath(). try: for prefix in pathutil.finddirs_rev_noroot(util.localpath(path)): if prefix in dircache: @@ -505,13 +534,13 @@ class vfs(abstractvfs): self, path: bytes, mode: bytes = b"rb", - atomictemp=False, - notindexed=False, - backgroundclose=False, - checkambig=False, - auditpath=True, - makeparentdirs=True, - ) -> Any: + atomictemp: bool = False, + notindexed: bool = False, + backgroundclose: bool = False, + checkambig: bool = False, + auditpath: bool = True, + makeparentdirs: bool = True, + ) -> Any: # TODO: should be BinaryIO if util.atomictempfile can be coersed """Open ``path`` file, which is relative to vfs root. By default, parent directories are created as needed. Newly created @@ -650,14 +679,14 @@ opener: Type[vfs] = vfs class proxyvfs(abstractvfs, abc.ABC): - def __init__(self, vfs: "vfs"): + def __init__(self, vfs: vfs) -> None: self.vfs = vfs @property - def createmode(self): + def createmode(self) -> Optional[int]: return self.vfs.createmode - def _auditpath(self, path, mode) -> None: + def _auditpath(self, path: bytes, mode: bytes) -> None: return self.vfs._auditpath(path, mode) @property @@ -676,10 +705,11 @@ class proxyvfs(abstractvfs, abc.ABC): class filtervfs(proxyvfs, abstractvfs): '''Wrapper vfs for filtering filenames with a function.''' - def __init__(self, vfs: "vfs", filter): + def __init__(self, vfs: vfs, filter) -> None: proxyvfs.__init__(self, vfs) self._filter = filter + # TODO: The return type should be BinaryIO def __call__(self, path: bytes, *args, **kwargs) -> Any: return self.vfs(self._filter(path), *args, **kwargs) @@ -696,9 +726,10 @@ filteropener: Type[filtervfs] = filtervf class readonlyvfs(proxyvfs): '''Wrapper vfs preventing any writing.''' - def __init__(self, vfs: "vfs"): + def __init__(self, vfs: vfs) -> None: proxyvfs.__init__(self, vfs) + # TODO: The return type should be BinaryIO def __call__(self, path: bytes, mode: bytes = b'rb', *args, **kw) -> Any: if mode not in (b'r', b'rb'): raise error.Abort(_(b'this vfs is read only')) @@ -717,13 +748,13 @@ class closewrapbase(abc.ABC): def __init__(self, fh) -> None: object.__setattr__(self, '_origfh', fh) - def __getattr__(self, attr) -> Any: + def __getattr__(self, attr: str) -> Any: return getattr(self._origfh, attr) - def __setattr__(self, attr, value) -> None: + def __setattr__(self, attr: str, value: Any) -> None: return setattr(self._origfh, attr, value) - def __delattr__(self, attr) -> None: + def __delattr__(self, attr: str) -> None: return delattr(self._origfh, attr) def __enter__(self: _Tclosewrapbase) -> _Tclosewrapbase: @@ -759,7 +790,7 @@ class delayclosedfile(closewrapbase): class backgroundfilecloser: """Coordinates background closing of file handles on multiple threads.""" - def __init__(self, ui, expectedcount=-1) -> None: + def __init__(self, ui: uimod.ui, expectedcount: int = -1) -> None: self._running = False self._entered = False self._threads = []