##// END OF EJS Templates
typing: add basic type hints to vfs.py...
Matt Harbison -
r50468:cc9a6005 default
parent child Browse files
Show More
@@ -11,6 +11,10 b' import shutil'
11 11 import stat
12 12 import threading
13 13
14 from typing import (
15 Optional,
16 )
17
14 18 from .i18n import _
15 19 from .pycompat import (
16 20 delattr,
@@ -26,7 +30,7 b' from . import ('
26 30 )
27 31
28 32
29 def _avoidambig(path, oldstat):
33 def _avoidambig(path: bytes, oldstat):
30 34 """Avoid file stat ambiguity forcibly
31 35
32 36 This function causes copying ``path`` file, if it is owned by
@@ -60,16 +64,17 b' class abstractvfs:'
60 64 '''Prevent instantiation; don't call this from subclasses.'''
61 65 raise NotImplementedError('attempted instantiating ' + str(type(self)))
62 66
63 def __call__(self, path, mode=b'rb', **kwargs):
67 # TODO: type return, which is util.posixfile wrapped by a proxy
68 def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs):
64 69 raise NotImplementedError
65 70
66 def _auditpath(self, path, mode):
71 def _auditpath(self, path: bytes, mode: bytes):
67 72 raise NotImplementedError
68 73
69 def join(self, path, *insidef):
74 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
70 75 raise NotImplementedError
71 76
72 def tryread(self, path):
77 def tryread(self, path: bytes) -> bytes:
73 78 '''gracefully return an empty string for missing files'''
74 79 try:
75 80 return self.read(path)
@@ -77,7 +82,7 b' class abstractvfs:'
77 82 pass
78 83 return b""
79 84
80 def tryreadlines(self, path, mode=b'rb'):
85 def tryreadlines(self, path: bytes, mode: bytes = b'rb'):
81 86 '''gracefully return an empty array for missing files'''
82 87 try:
83 88 return self.readlines(path, mode=mode)
@@ -95,57 +100,61 b' class abstractvfs:'
95 100 """
96 101 return self.__call__
97 102
98 def read(self, path):
103 def read(self, path: bytes) -> bytes:
99 104 with self(path, b'rb') as fp:
100 105 return fp.read()
101 106
102 def readlines(self, path, mode=b'rb'):
107 def readlines(self, path: bytes, mode: bytes = b'rb'):
103 108 with self(path, mode=mode) as fp:
104 109 return fp.readlines()
105 110
106 def write(self, path, data, backgroundclose=False, **kwargs):
111 def write(
112 self, path: bytes, data: bytes, backgroundclose=False, **kwargs
113 ) -> int:
107 114 with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
108 115 return fp.write(data)
109 116
110 def writelines(self, path, data, mode=b'wb', notindexed=False):
117 def writelines(
118 self, path: bytes, data: bytes, mode: bytes = b'wb', notindexed=False
119 ) -> None:
111 120 with self(path, mode=mode, notindexed=notindexed) as fp:
112 121 return fp.writelines(data)
113 122
114 def append(self, path, data):
123 def append(self, path: bytes, data: bytes) -> int:
115 124 with self(path, b'ab') as fp:
116 125 return fp.write(data)
117 126
118 def basename(self, path):
127 def basename(self, path: bytes) -> bytes:
119 128 """return base element of a path (as os.path.basename would do)
120 129
121 130 This exists to allow handling of strange encoding if needed."""
122 131 return os.path.basename(path)
123 132
124 def chmod(self, path, mode):
133 def chmod(self, path: bytes, mode: int) -> None:
125 134 return os.chmod(self.join(path), mode)
126 135
127 def dirname(self, path):
136 def dirname(self, path: bytes) -> bytes:
128 137 """return dirname element of a path (as os.path.dirname would do)
129 138
130 139 This exists to allow handling of strange encoding if needed."""
131 140 return os.path.dirname(path)
132 141
133 def exists(self, path=None):
142 def exists(self, path: Optional[bytes] = None) -> bool:
134 143 return os.path.exists(self.join(path))
135 144
136 145 def fstat(self, fp):
137 146 return util.fstat(fp)
138 147
139 def isdir(self, path=None):
148 def isdir(self, path: Optional[bytes] = None) -> bool:
140 149 return os.path.isdir(self.join(path))
141 150
142 def isfile(self, path=None):
151 def isfile(self, path: Optional[bytes] = None) -> bool:
143 152 return os.path.isfile(self.join(path))
144 153
145 def islink(self, path=None):
154 def islink(self, path: Optional[bytes] = None) -> bool:
146 155 return os.path.islink(self.join(path))
147 156
148 def isfileorlink(self, path=None):
157 def isfileorlink(self, path: Optional[bytes] = None) -> bool:
149 158 """return whether path is a regular file or a symlink
150 159
151 160 Unlike isfile, this doesn't follow symlinks."""
@@ -156,7 +165,7 b' class abstractvfs:'
156 165 mode = st.st_mode
157 166 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
158 167
159 def _join(self, *paths):
168 def _join(self, *paths: bytes) -> bytes:
160 169 root_idx = 0
161 170 for idx, p in enumerate(paths):
162 171 if os.path.isabs(p) or p.startswith(self._dir_sep):
@@ -166,41 +175,48 b' class abstractvfs:'
166 175 paths = [p for p in paths if p]
167 176 return self._dir_sep.join(paths)
168 177
169 def reljoin(self, *paths):
178 def reljoin(self, *paths: bytes) -> bytes:
170 179 """join various elements of a path together (as os.path.join would do)
171 180
172 181 The vfs base is not injected so that path stay relative. This exists
173 182 to allow handling of strange encoding if needed."""
174 183 return self._join(*paths)
175 184
176 def split(self, path):
185 def split(self, path: bytes):
177 186 """split top-most element of a path (as os.path.split would do)
178 187
179 188 This exists to allow handling of strange encoding if needed."""
180 189 return os.path.split(path)
181 190
182 def lexists(self, path=None):
191 def lexists(self, path: Optional[bytes] = None) -> bool:
183 192 return os.path.lexists(self.join(path))
184 193
185 def lstat(self, path=None):
194 def lstat(self, path: Optional[bytes] = None):
186 195 return os.lstat(self.join(path))
187 196
188 def listdir(self, path=None):
197 def listdir(self, path: Optional[bytes] = None):
189 198 return os.listdir(self.join(path))
190 199
191 def makedir(self, path=None, notindexed=True):
200 def makedir(self, path: Optional[bytes] = None, notindexed=True):
192 201 return util.makedir(self.join(path), notindexed)
193 202
194 def makedirs(self, path=None, mode=None):
203 def makedirs(
204 self, path: Optional[bytes] = None, mode: Optional[int] = None
205 ):
195 206 return util.makedirs(self.join(path), mode)
196 207
197 def makelock(self, info, path):
208 def makelock(self, info, path: bytes):
198 209 return util.makelock(info, self.join(path))
199 210
200 def mkdir(self, path=None):
211 def mkdir(self, path: Optional[bytes] = None):
201 212 return os.mkdir(self.join(path))
202 213
203 def mkstemp(self, suffix=b'', prefix=b'tmp', dir=None):
214 def mkstemp(
215 self,
216 suffix: bytes = b'',
217 prefix: bytes = b'tmp',
218 dir: Optional[bytes] = None,
219 ):
204 220 fd, name = pycompat.mkstemp(
205 221 suffix=suffix, prefix=prefix, dir=self.join(dir)
206 222 )
@@ -210,13 +226,13 b' class abstractvfs:'
210 226 else:
211 227 return fd, fname
212 228
213 def readdir(self, path=None, stat=None, skip=None):
229 def readdir(self, path: Optional[bytes] = None, stat=None, skip=None):
214 230 return util.listdir(self.join(path), stat, skip)
215 231
216 def readlock(self, path):
232 def readlock(self, path: bytes) -> bytes:
217 233 return util.readlock(self.join(path))
218 234
219 def rename(self, src, dst, checkambig=False):
235 def rename(self, src: bytes, dst: bytes, checkambig=False):
220 236 """Rename from src to dst
221 237
222 238 checkambig argument is used with util.filestat, and is useful
@@ -238,18 +254,20 b' class abstractvfs:'
238 254 return ret
239 255 return util.rename(srcpath, dstpath)
240 256
241 def readlink(self, path):
257 def readlink(self, path: bytes) -> bytes:
242 258 return util.readlink(self.join(path))
243 259
244 def removedirs(self, path=None):
260 def removedirs(self, path: Optional[bytes] = None):
245 261 """Remove a leaf directory and all empty intermediate ones"""
246 262 return util.removedirs(self.join(path))
247 263
248 def rmdir(self, path=None):
264 def rmdir(self, path: Optional[bytes] = None):
249 265 """Remove an empty directory."""
250 266 return os.rmdir(self.join(path))
251 267
252 def rmtree(self, path=None, ignore_errors=False, forcibly=False):
268 def rmtree(
269 self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
270 ):
253 271 """Remove a directory tree recursively
254 272
255 273 If ``forcibly``, this tries to remove READ-ONLY files, too.
@@ -272,28 +290,30 b' class abstractvfs:'
272 290 self.join(path), ignore_errors=ignore_errors, onerror=onerror
273 291 )
274 292
275 def setflags(self, path, l, x):
293 def setflags(self, path: bytes, l: bool, x: bool):
276 294 return util.setflags(self.join(path), l, x)
277 295
278 def stat(self, path=None):
296 def stat(self, path: Optional[bytes] = None):
279 297 return os.stat(self.join(path))
280 298
281 def unlink(self, path=None):
299 def unlink(self, path: Optional[bytes] = None):
282 300 return util.unlink(self.join(path))
283 301
284 def tryunlink(self, path=None):
302 def tryunlink(self, path: Optional[bytes] = None):
285 303 """Attempt to remove a file, ignoring missing file errors."""
286 304 util.tryunlink(self.join(path))
287 305
288 def unlinkpath(self, path=None, ignoremissing=False, rmdir=True):
306 def unlinkpath(
307 self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
308 ):
289 309 return util.unlinkpath(
290 310 self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
291 311 )
292 312
293 def utime(self, path=None, t=None):
313 def utime(self, path: Optional[bytes] = None, t=None):
294 314 return os.utime(self.join(path), t)
295 315
296 def walk(self, path=None, onerror=None):
316 def walk(self, path: Optional[bytes] = None, onerror=None):
297 317 """Yield (dirpath, dirs, files) tuple for each directories under path
298 318
299 319 ``dirpath`` is relative one from the root of this vfs. This
@@ -360,7 +380,7 b' class vfs(abstractvfs):'
360 380
361 381 def __init__(
362 382 self,
363 base,
383 base: bytes,
364 384 audit=True,
365 385 cacheaudited=False,
366 386 expandpath=False,
@@ -381,7 +401,7 b' class vfs(abstractvfs):'
381 401 self.options = {}
382 402
383 403 @util.propertycache
384 def _cansymlink(self):
404 def _cansymlink(self) -> bool:
385 405 return util.checklink(self.base)
386 406
387 407 @util.propertycache
@@ -393,7 +413,7 b' class vfs(abstractvfs):'
393 413 return
394 414 os.chmod(name, self.createmode & 0o666)
395 415
396 def _auditpath(self, path, mode):
416 def _auditpath(self, path, mode) -> None:
397 417 if self._audit:
398 418 if os.path.isabs(path) and path.startswith(self.base):
399 419 path = os.path.relpath(path, self.base)
@@ -404,8 +424,8 b' class vfs(abstractvfs):'
404 424
405 425 def __call__(
406 426 self,
407 path,
408 mode=b"r",
427 path: bytes,
428 mode: bytes = b"r",
409 429 atomictemp=False,
410 430 notindexed=False,
411 431 backgroundclose=False,
@@ -518,7 +538,7 b' class vfs(abstractvfs):'
518 538
519 539 return fp
520 540
521 def symlink(self, src, dst):
541 def symlink(self, src: bytes, dst: bytes) -> None:
522 542 self.audit(dst)
523 543 linkname = self.join(dst)
524 544 util.tryunlink(linkname)
@@ -538,7 +558,7 b' class vfs(abstractvfs):'
538 558 else:
539 559 self.write(dst, src)
540 560
541 def join(self, path, *insidef):
561 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
542 562 if path:
543 563 parts = [self.base, path]
544 564 parts.extend(insidef)
@@ -551,7 +571,7 b' opener = vfs'
551 571
552 572
553 573 class proxyvfs(abstractvfs):
554 def __init__(self, vfs):
574 def __init__(self, vfs: "vfs"):
555 575 self.vfs = vfs
556 576
557 577 def _auditpath(self, path, mode):
@@ -569,14 +589,14 b' class proxyvfs(abstractvfs):'
569 589 class filtervfs(proxyvfs, abstractvfs):
570 590 '''Wrapper vfs for filtering filenames with a function.'''
571 591
572 def __init__(self, vfs, filter):
592 def __init__(self, vfs: "vfs", filter):
573 593 proxyvfs.__init__(self, vfs)
574 594 self._filter = filter
575 595
576 def __call__(self, path, *args, **kwargs):
596 def __call__(self, path: bytes, *args, **kwargs):
577 597 return self.vfs(self._filter(path), *args, **kwargs)
578 598
579 def join(self, path, *insidef):
599 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
580 600 if path:
581 601 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
582 602 else:
@@ -589,15 +609,15 b' filteropener = filtervfs'
589 609 class readonlyvfs(proxyvfs):
590 610 '''Wrapper vfs preventing any writing.'''
591 611
592 def __init__(self, vfs):
612 def __init__(self, vfs: "vfs"):
593 613 proxyvfs.__init__(self, vfs)
594 614
595 def __call__(self, path, mode=b'r', *args, **kw):
615 def __call__(self, path: bytes, mode: bytes = b'r', *args, **kw):
596 616 if mode not in (b'r', b'rb'):
597 617 raise error.Abort(_(b'this vfs is read only'))
598 618 return self.vfs(path, mode, *args, **kw)
599 619
600 def join(self, path, *insidef):
620 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
601 621 return self.vfs.join(path, *insidef)
602 622
603 623
General Comments 0
You need to be logged in to leave comments. Login now