##// END OF EJS Templates
typing: add basic type hints to vfs.py...
Matt Harbison -
r50468:cc9a6005 default
parent child Browse files
Show More
@@ -11,6 +11,10 b' import shutil'
11 import stat
11 import stat
12 import threading
12 import threading
13
13
14 from typing import (
15 Optional,
16 )
17
14 from .i18n import _
18 from .i18n import _
15 from .pycompat import (
19 from .pycompat import (
16 delattr,
20 delattr,
@@ -26,7 +30,7 b' from . import ('
26 )
30 )
27
31
28
32
29 def _avoidambig(path, oldstat):
33 def _avoidambig(path: bytes, oldstat):
30 """Avoid file stat ambiguity forcibly
34 """Avoid file stat ambiguity forcibly
31
35
32 This function causes copying ``path`` file, if it is owned by
36 This function causes copying ``path`` file, if it is owned by
@@ -60,16 +64,17 b' class abstractvfs:'
60 '''Prevent instantiation; don't call this from subclasses.'''
64 '''Prevent instantiation; don't call this from subclasses.'''
61 raise NotImplementedError('attempted instantiating ' + str(type(self)))
65 raise NotImplementedError('attempted instantiating ' + str(type(self)))
62
66
63 def __call__(self, path, mode=b'rb', **kwargs):
67 # TODO: type return, which is util.posixfile wrapped by a proxy
68 def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs):
64 raise NotImplementedError
69 raise NotImplementedError
65
70
66 def _auditpath(self, path, mode):
71 def _auditpath(self, path: bytes, mode: bytes):
67 raise NotImplementedError
72 raise NotImplementedError
68
73
69 def join(self, path, *insidef):
74 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
70 raise NotImplementedError
75 raise NotImplementedError
71
76
72 def tryread(self, path):
77 def tryread(self, path: bytes) -> bytes:
73 '''gracefully return an empty string for missing files'''
78 '''gracefully return an empty string for missing files'''
74 try:
79 try:
75 return self.read(path)
80 return self.read(path)
@@ -77,7 +82,7 b' class abstractvfs:'
77 pass
82 pass
78 return b""
83 return b""
79
84
80 def tryreadlines(self, path, mode=b'rb'):
85 def tryreadlines(self, path: bytes, mode: bytes = b'rb'):
81 '''gracefully return an empty array for missing files'''
86 '''gracefully return an empty array for missing files'''
82 try:
87 try:
83 return self.readlines(path, mode=mode)
88 return self.readlines(path, mode=mode)
@@ -95,57 +100,61 b' class abstractvfs:'
95 """
100 """
96 return self.__call__
101 return self.__call__
97
102
98 def read(self, path):
103 def read(self, path: bytes) -> bytes:
99 with self(path, b'rb') as fp:
104 with self(path, b'rb') as fp:
100 return fp.read()
105 return fp.read()
101
106
102 def readlines(self, path, mode=b'rb'):
107 def readlines(self, path: bytes, mode: bytes = b'rb'):
103 with self(path, mode=mode) as fp:
108 with self(path, mode=mode) as fp:
104 return fp.readlines()
109 return fp.readlines()
105
110
106 def write(self, path, data, backgroundclose=False, **kwargs):
111 def write(
112 self, path: bytes, data: bytes, backgroundclose=False, **kwargs
113 ) -> int:
107 with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
114 with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
108 return fp.write(data)
115 return fp.write(data)
109
116
110 def writelines(self, path, data, mode=b'wb', notindexed=False):
117 def writelines(
118 self, path: bytes, data: bytes, mode: bytes = b'wb', notindexed=False
119 ) -> None:
111 with self(path, mode=mode, notindexed=notindexed) as fp:
120 with self(path, mode=mode, notindexed=notindexed) as fp:
112 return fp.writelines(data)
121 return fp.writelines(data)
113
122
114 def append(self, path, data):
123 def append(self, path: bytes, data: bytes) -> int:
115 with self(path, b'ab') as fp:
124 with self(path, b'ab') as fp:
116 return fp.write(data)
125 return fp.write(data)
117
126
118 def basename(self, path):
127 def basename(self, path: bytes) -> bytes:
119 """return base element of a path (as os.path.basename would do)
128 """return base element of a path (as os.path.basename would do)
120
129
121 This exists to allow handling of strange encoding if needed."""
130 This exists to allow handling of strange encoding if needed."""
122 return os.path.basename(path)
131 return os.path.basename(path)
123
132
124 def chmod(self, path, mode):
133 def chmod(self, path: bytes, mode: int) -> None:
125 return os.chmod(self.join(path), mode)
134 return os.chmod(self.join(path), mode)
126
135
127 def dirname(self, path):
136 def dirname(self, path: bytes) -> bytes:
128 """return dirname element of a path (as os.path.dirname would do)
137 """return dirname element of a path (as os.path.dirname would do)
129
138
130 This exists to allow handling of strange encoding if needed."""
139 This exists to allow handling of strange encoding if needed."""
131 return os.path.dirname(path)
140 return os.path.dirname(path)
132
141
133 def exists(self, path=None):
142 def exists(self, path: Optional[bytes] = None) -> bool:
134 return os.path.exists(self.join(path))
143 return os.path.exists(self.join(path))
135
144
136 def fstat(self, fp):
145 def fstat(self, fp):
137 return util.fstat(fp)
146 return util.fstat(fp)
138
147
139 def isdir(self, path=None):
148 def isdir(self, path: Optional[bytes] = None) -> bool:
140 return os.path.isdir(self.join(path))
149 return os.path.isdir(self.join(path))
141
150
142 def isfile(self, path=None):
151 def isfile(self, path: Optional[bytes] = None) -> bool:
143 return os.path.isfile(self.join(path))
152 return os.path.isfile(self.join(path))
144
153
145 def islink(self, path=None):
154 def islink(self, path: Optional[bytes] = None) -> bool:
146 return os.path.islink(self.join(path))
155 return os.path.islink(self.join(path))
147
156
148 def isfileorlink(self, path=None):
157 def isfileorlink(self, path: Optional[bytes] = None) -> bool:
149 """return whether path is a regular file or a symlink
158 """return whether path is a regular file or a symlink
150
159
151 Unlike isfile, this doesn't follow symlinks."""
160 Unlike isfile, this doesn't follow symlinks."""
@@ -156,7 +165,7 b' class abstractvfs:'
156 mode = st.st_mode
165 mode = st.st_mode
157 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
166 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
158
167
159 def _join(self, *paths):
168 def _join(self, *paths: bytes) -> bytes:
160 root_idx = 0
169 root_idx = 0
161 for idx, p in enumerate(paths):
170 for idx, p in enumerate(paths):
162 if os.path.isabs(p) or p.startswith(self._dir_sep):
171 if os.path.isabs(p) or p.startswith(self._dir_sep):
@@ -166,41 +175,48 b' class abstractvfs:'
166 paths = [p for p in paths if p]
175 paths = [p for p in paths if p]
167 return self._dir_sep.join(paths)
176 return self._dir_sep.join(paths)
168
177
169 def reljoin(self, *paths):
178 def reljoin(self, *paths: bytes) -> bytes:
170 """join various elements of a path together (as os.path.join would do)
179 """join various elements of a path together (as os.path.join would do)
171
180
172 The vfs base is not injected so that path stay relative. This exists
181 The vfs base is not injected so that path stay relative. This exists
173 to allow handling of strange encoding if needed."""
182 to allow handling of strange encoding if needed."""
174 return self._join(*paths)
183 return self._join(*paths)
175
184
176 def split(self, path):
185 def split(self, path: bytes):
177 """split top-most element of a path (as os.path.split would do)
186 """split top-most element of a path (as os.path.split would do)
178
187
179 This exists to allow handling of strange encoding if needed."""
188 This exists to allow handling of strange encoding if needed."""
180 return os.path.split(path)
189 return os.path.split(path)
181
190
182 def lexists(self, path=None):
191 def lexists(self, path: Optional[bytes] = None) -> bool:
183 return os.path.lexists(self.join(path))
192 return os.path.lexists(self.join(path))
184
193
185 def lstat(self, path=None):
194 def lstat(self, path: Optional[bytes] = None):
186 return os.lstat(self.join(path))
195 return os.lstat(self.join(path))
187
196
188 def listdir(self, path=None):
197 def listdir(self, path: Optional[bytes] = None):
189 return os.listdir(self.join(path))
198 return os.listdir(self.join(path))
190
199
191 def makedir(self, path=None, notindexed=True):
200 def makedir(self, path: Optional[bytes] = None, notindexed=True):
192 return util.makedir(self.join(path), notindexed)
201 return util.makedir(self.join(path), notindexed)
193
202
194 def makedirs(self, path=None, mode=None):
203 def makedirs(
204 self, path: Optional[bytes] = None, mode: Optional[int] = None
205 ):
195 return util.makedirs(self.join(path), mode)
206 return util.makedirs(self.join(path), mode)
196
207
197 def makelock(self, info, path):
208 def makelock(self, info, path: bytes):
198 return util.makelock(info, self.join(path))
209 return util.makelock(info, self.join(path))
199
210
200 def mkdir(self, path=None):
211 def mkdir(self, path: Optional[bytes] = None):
201 return os.mkdir(self.join(path))
212 return os.mkdir(self.join(path))
202
213
203 def mkstemp(self, suffix=b'', prefix=b'tmp', dir=None):
214 def mkstemp(
215 self,
216 suffix: bytes = b'',
217 prefix: bytes = b'tmp',
218 dir: Optional[bytes] = None,
219 ):
204 fd, name = pycompat.mkstemp(
220 fd, name = pycompat.mkstemp(
205 suffix=suffix, prefix=prefix, dir=self.join(dir)
221 suffix=suffix, prefix=prefix, dir=self.join(dir)
206 )
222 )
@@ -210,13 +226,13 b' class abstractvfs:'
210 else:
226 else:
211 return fd, fname
227 return fd, fname
212
228
213 def readdir(self, path=None, stat=None, skip=None):
229 def readdir(self, path: Optional[bytes] = None, stat=None, skip=None):
214 return util.listdir(self.join(path), stat, skip)
230 return util.listdir(self.join(path), stat, skip)
215
231
216 def readlock(self, path):
232 def readlock(self, path: bytes) -> bytes:
217 return util.readlock(self.join(path))
233 return util.readlock(self.join(path))
218
234
219 def rename(self, src, dst, checkambig=False):
235 def rename(self, src: bytes, dst: bytes, checkambig=False):
220 """Rename from src to dst
236 """Rename from src to dst
221
237
222 checkambig argument is used with util.filestat, and is useful
238 checkambig argument is used with util.filestat, and is useful
@@ -238,18 +254,20 b' class abstractvfs:'
238 return ret
254 return ret
239 return util.rename(srcpath, dstpath)
255 return util.rename(srcpath, dstpath)
240
256
241 def readlink(self, path):
257 def readlink(self, path: bytes) -> bytes:
242 return util.readlink(self.join(path))
258 return util.readlink(self.join(path))
243
259
244 def removedirs(self, path=None):
260 def removedirs(self, path: Optional[bytes] = None):
245 """Remove a leaf directory and all empty intermediate ones"""
261 """Remove a leaf directory and all empty intermediate ones"""
246 return util.removedirs(self.join(path))
262 return util.removedirs(self.join(path))
247
263
248 def rmdir(self, path=None):
264 def rmdir(self, path: Optional[bytes] = None):
249 """Remove an empty directory."""
265 """Remove an empty directory."""
250 return os.rmdir(self.join(path))
266 return os.rmdir(self.join(path))
251
267
252 def rmtree(self, path=None, ignore_errors=False, forcibly=False):
268 def rmtree(
269 self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
270 ):
253 """Remove a directory tree recursively
271 """Remove a directory tree recursively
254
272
255 If ``forcibly``, this tries to remove READ-ONLY files, too.
273 If ``forcibly``, this tries to remove READ-ONLY files, too.
@@ -272,28 +290,30 b' class abstractvfs:'
272 self.join(path), ignore_errors=ignore_errors, onerror=onerror
290 self.join(path), ignore_errors=ignore_errors, onerror=onerror
273 )
291 )
274
292
275 def setflags(self, path, l, x):
293 def setflags(self, path: bytes, l: bool, x: bool):
276 return util.setflags(self.join(path), l, x)
294 return util.setflags(self.join(path), l, x)
277
295
278 def stat(self, path=None):
296 def stat(self, path: Optional[bytes] = None):
279 return os.stat(self.join(path))
297 return os.stat(self.join(path))
280
298
281 def unlink(self, path=None):
299 def unlink(self, path: Optional[bytes] = None):
282 return util.unlink(self.join(path))
300 return util.unlink(self.join(path))
283
301
284 def tryunlink(self, path=None):
302 def tryunlink(self, path: Optional[bytes] = None):
285 """Attempt to remove a file, ignoring missing file errors."""
303 """Attempt to remove a file, ignoring missing file errors."""
286 util.tryunlink(self.join(path))
304 util.tryunlink(self.join(path))
287
305
288 def unlinkpath(self, path=None, ignoremissing=False, rmdir=True):
306 def unlinkpath(
307 self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
308 ):
289 return util.unlinkpath(
309 return util.unlinkpath(
290 self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
310 self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
291 )
311 )
292
312
293 def utime(self, path=None, t=None):
313 def utime(self, path: Optional[bytes] = None, t=None):
294 return os.utime(self.join(path), t)
314 return os.utime(self.join(path), t)
295
315
296 def walk(self, path=None, onerror=None):
316 def walk(self, path: Optional[bytes] = None, onerror=None):
297 """Yield (dirpath, dirs, files) tuple for each directories under path
317 """Yield (dirpath, dirs, files) tuple for each directories under path
298
318
299 ``dirpath`` is relative one from the root of this vfs. This
319 ``dirpath`` is relative one from the root of this vfs. This
@@ -360,7 +380,7 b' class vfs(abstractvfs):'
360
380
361 def __init__(
381 def __init__(
362 self,
382 self,
363 base,
383 base: bytes,
364 audit=True,
384 audit=True,
365 cacheaudited=False,
385 cacheaudited=False,
366 expandpath=False,
386 expandpath=False,
@@ -381,7 +401,7 b' class vfs(abstractvfs):'
381 self.options = {}
401 self.options = {}
382
402
383 @util.propertycache
403 @util.propertycache
384 def _cansymlink(self):
404 def _cansymlink(self) -> bool:
385 return util.checklink(self.base)
405 return util.checklink(self.base)
386
406
387 @util.propertycache
407 @util.propertycache
@@ -393,7 +413,7 b' class vfs(abstractvfs):'
393 return
413 return
394 os.chmod(name, self.createmode & 0o666)
414 os.chmod(name, self.createmode & 0o666)
395
415
396 def _auditpath(self, path, mode):
416 def _auditpath(self, path, mode) -> None:
397 if self._audit:
417 if self._audit:
398 if os.path.isabs(path) and path.startswith(self.base):
418 if os.path.isabs(path) and path.startswith(self.base):
399 path = os.path.relpath(path, self.base)
419 path = os.path.relpath(path, self.base)
@@ -404,8 +424,8 b' class vfs(abstractvfs):'
404
424
405 def __call__(
425 def __call__(
406 self,
426 self,
407 path,
427 path: bytes,
408 mode=b"r",
428 mode: bytes = b"r",
409 atomictemp=False,
429 atomictemp=False,
410 notindexed=False,
430 notindexed=False,
411 backgroundclose=False,
431 backgroundclose=False,
@@ -518,7 +538,7 b' class vfs(abstractvfs):'
518
538
519 return fp
539 return fp
520
540
521 def symlink(self, src, dst):
541 def symlink(self, src: bytes, dst: bytes) -> None:
522 self.audit(dst)
542 self.audit(dst)
523 linkname = self.join(dst)
543 linkname = self.join(dst)
524 util.tryunlink(linkname)
544 util.tryunlink(linkname)
@@ -538,7 +558,7 b' class vfs(abstractvfs):'
538 else:
558 else:
539 self.write(dst, src)
559 self.write(dst, src)
540
560
541 def join(self, path, *insidef):
561 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
542 if path:
562 if path:
543 parts = [self.base, path]
563 parts = [self.base, path]
544 parts.extend(insidef)
564 parts.extend(insidef)
@@ -551,7 +571,7 b' opener = vfs'
551
571
552
572
553 class proxyvfs(abstractvfs):
573 class proxyvfs(abstractvfs):
554 def __init__(self, vfs):
574 def __init__(self, vfs: "vfs"):
555 self.vfs = vfs
575 self.vfs = vfs
556
576
557 def _auditpath(self, path, mode):
577 def _auditpath(self, path, mode):
@@ -569,14 +589,14 b' class proxyvfs(abstractvfs):'
569 class filtervfs(proxyvfs, abstractvfs):
589 class filtervfs(proxyvfs, abstractvfs):
570 '''Wrapper vfs for filtering filenames with a function.'''
590 '''Wrapper vfs for filtering filenames with a function.'''
571
591
572 def __init__(self, vfs, filter):
592 def __init__(self, vfs: "vfs", filter):
573 proxyvfs.__init__(self, vfs)
593 proxyvfs.__init__(self, vfs)
574 self._filter = filter
594 self._filter = filter
575
595
576 def __call__(self, path, *args, **kwargs):
596 def __call__(self, path: bytes, *args, **kwargs):
577 return self.vfs(self._filter(path), *args, **kwargs)
597 return self.vfs(self._filter(path), *args, **kwargs)
578
598
579 def join(self, path, *insidef):
599 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
580 if path:
600 if path:
581 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
601 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
582 else:
602 else:
@@ -589,15 +609,15 b' filteropener = filtervfs'
589 class readonlyvfs(proxyvfs):
609 class readonlyvfs(proxyvfs):
590 '''Wrapper vfs preventing any writing.'''
610 '''Wrapper vfs preventing any writing.'''
591
611
592 def __init__(self, vfs):
612 def __init__(self, vfs: "vfs"):
593 proxyvfs.__init__(self, vfs)
613 proxyvfs.__init__(self, vfs)
594
614
595 def __call__(self, path, mode=b'r', *args, **kw):
615 def __call__(self, path: bytes, mode: bytes = b'r', *args, **kw):
596 if mode not in (b'r', b'rb'):
616 if mode not in (b'r', b'rb'):
597 raise error.Abort(_(b'this vfs is read only'))
617 raise error.Abort(_(b'this vfs is read only'))
598 return self.vfs(path, mode, *args, **kw)
618 return self.vfs(path, mode, *args, **kw)
599
619
600 def join(self, path, *insidef):
620 def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
601 return self.vfs.join(path, *insidef)
621 return self.vfs.join(path, *insidef)
602
622
603
623
General Comments 0
You need to be logged in to leave comments. Login now