##// END OF EJS Templates
vfs: simplify path audit disabling in stream clone...
marmoute -
r33255:15e9cbe6 default
parent child Browse files
Show More
@@ -1,417 +1,418 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 phases,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 pullop.repo.ui.warn(_(
85 85 'warning: stream clone requested but server has them '
86 86 'disabled\n'))
87 87 return False, None
88 88
89 89 streamreqs = set(streamreqs.split(','))
90 90 # Server requires something we don't support. Bail.
91 91 missingreqs = streamreqs - repo.supportedformats
92 92 if missingreqs:
93 93 pullop.repo.ui.warn(_(
94 94 'warning: stream clone requested but client is missing '
95 95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 96 pullop.repo.ui.warn(
97 97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 98 'for more information)\n'))
99 99 return False, None
100 100 requirements = streamreqs
101 101
102 102 return True, requirements
103 103
104 104 def maybeperformlegacystreamclone(pullop):
105 105 """Possibly perform a legacy stream clone operation.
106 106
107 107 Legacy stream clones are performed as part of pull but before all other
108 108 operations.
109 109
110 110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 111 supported.
112 112 """
113 113 supported, requirements = canperformstreamclone(pullop)
114 114
115 115 if not supported:
116 116 return
117 117
118 118 repo = pullop.repo
119 119 remote = pullop.remote
120 120
121 121 # Save remote branchmap. We will use it later to speed up branchcache
122 122 # creation.
123 123 rbranchmap = None
124 124 if remote.capable('branchmap'):
125 125 rbranchmap = remote.branchmap()
126 126
127 127 repo.ui.status(_('streaming all changes\n'))
128 128
129 129 fp = remote.stream_out()
130 130 l = fp.readline()
131 131 try:
132 132 resp = int(l)
133 133 except ValueError:
134 134 raise error.ResponseError(
135 135 _('unexpected response from remote server:'), l)
136 136 if resp == 1:
137 137 raise error.Abort(_('operation forbidden by server'))
138 138 elif resp == 2:
139 139 raise error.Abort(_('locking the remote repository failed'))
140 140 elif resp != 0:
141 141 raise error.Abort(_('the server sent an unknown error code'))
142 142
143 143 l = fp.readline()
144 144 try:
145 145 filecount, bytecount = map(int, l.split(' ', 1))
146 146 except (ValueError, TypeError):
147 147 raise error.ResponseError(
148 148 _('unexpected response from remote server:'), l)
149 149
150 150 with repo.lock():
151 151 consumev1(repo, fp, filecount, bytecount)
152 152
153 153 # new requirements = old non-format requirements +
154 154 # new format-related remote requirements
155 155 # requirements from the streamed-in repository
156 156 repo.requirements = requirements | (
157 157 repo.requirements - repo.supportedformats)
158 158 repo._applyopenerreqs()
159 159 repo._writerequirements()
160 160
161 161 if rbranchmap:
162 162 branchmap.replacecache(repo, rbranchmap)
163 163
164 164 repo.invalidate()
165 165
166 166 def allowservergeneration(repo):
167 167 """Whether streaming clones are allowed from the server."""
168 168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
169 169 return False
170 170
171 171 # The way stream clone works makes it impossible to hide secret changesets.
172 172 # So don't allow this by default.
173 173 secret = phases.hassecret(repo)
174 174 if secret:
175 175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176 176
177 177 return True
178 178
179 179 # This is it's own function so extensions can override it.
180 180 def _walkstreamfiles(repo):
181 181 return repo.store.walk()
182 182
183 183 def generatev1(repo):
184 184 """Emit content for version 1 of a streaming clone.
185 185
186 186 This returns a 3-tuple of (file count, byte size, data iterator).
187 187
188 188 The data iterator consists of N entries for each file being transferred.
189 189 Each file entry starts as a line with the file name and integer size
190 190 delimited by a null byte.
191 191
192 192 The raw file data follows. Following the raw file data is the next file
193 193 entry, or EOF.
194 194
195 195 When used on the wire protocol, an additional line indicating protocol
196 196 success will be prepended to the stream. This function is not responsible
197 197 for adding it.
198 198
199 199 This function will obtain a repository lock to ensure a consistent view of
200 200 the store is captured. It therefore may raise LockError.
201 201 """
202 202 entries = []
203 203 total_bytes = 0
204 204 # Get consistent snapshot of repo, lock during scan.
205 205 with repo.lock():
206 206 repo.ui.debug('scanning\n')
207 207 for name, ename, size in _walkstreamfiles(repo):
208 208 if size:
209 209 entries.append((name, size))
210 210 total_bytes += size
211 211
212 212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 213 (len(entries), total_bytes))
214 214
215 215 svfs = repo.svfs
216 216 oldaudit = svfs.mustaudit
217 217 debugflag = repo.ui.debugflag
218 218 svfs.mustaudit = False
219 219
220 220 def emitrevlogdata():
221 221 try:
222 222 for name, size in entries:
223 223 if debugflag:
224 224 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
225 225 # partially encode name over the wire for backwards compat
226 226 yield '%s\0%d\n' % (store.encodedir(name), size)
227 227 if size <= 65536:
228 with svfs(name, 'rb') as fp:
228 with svfs(name, 'rb', auditpath=False) as fp:
229 229 yield fp.read(size)
230 230 else:
231 for chunk in util.filechunkiter(svfs(name), limit=size):
231 data = svfs(name, auditpath=False)
232 for chunk in util.filechunkiter(data, limit=size):
232 233 yield chunk
233 234 finally:
234 235 svfs.mustaudit = oldaudit
235 236
236 237 return len(entries), total_bytes, emitrevlogdata()
237 238
238 239 def generatev1wireproto(repo):
239 240 """Emit content for version 1 of streaming clone suitable for the wire.
240 241
241 242 This is the data output from ``generatev1()`` with a header line
242 243 indicating file count and byte size.
243 244 """
244 245 filecount, bytecount, it = generatev1(repo)
245 246 yield '%d %d\n' % (filecount, bytecount)
246 247 for chunk in it:
247 248 yield chunk
248 249
249 250 def generatebundlev1(repo, compression='UN'):
250 251 """Emit content for version 1 of a stream clone bundle.
251 252
252 253 The first 4 bytes of the output ("HGS1") denote this as stream clone
253 254 bundle version 1.
254 255
255 256 The next 2 bytes indicate the compression type. Only "UN" is currently
256 257 supported.
257 258
258 259 The next 16 bytes are two 64-bit big endian unsigned integers indicating
259 260 file count and byte count, respectively.
260 261
261 262 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
262 263 of the requirements string, including a trailing \0. The following N bytes
263 264 are the requirements string, which is ASCII containing a comma-delimited
264 265 list of repo requirements that are needed to support the data.
265 266
266 267 The remaining content is the output of ``generatev1()`` (which may be
267 268 compressed in the future).
268 269
269 270 Returns a tuple of (requirements, data generator).
270 271 """
271 272 if compression != 'UN':
272 273 raise ValueError('we do not support the compression argument yet')
273 274
274 275 requirements = repo.requirements & repo.supportedformats
275 276 requires = ','.join(sorted(requirements))
276 277
277 278 def gen():
278 279 yield 'HGS1'
279 280 yield compression
280 281
281 282 filecount, bytecount, it = generatev1(repo)
282 283 repo.ui.status(_('writing %d bytes for %d files\n') %
283 284 (bytecount, filecount))
284 285
285 286 yield struct.pack('>QQ', filecount, bytecount)
286 287 yield struct.pack('>H', len(requires) + 1)
287 288 yield requires + '\0'
288 289
289 290 # This is where we'll add compression in the future.
290 291 assert compression == 'UN'
291 292
292 293 seen = 0
293 294 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
294 295
295 296 for chunk in it:
296 297 seen += len(chunk)
297 298 repo.ui.progress(_('bundle'), seen, total=bytecount,
298 299 unit=_('bytes'))
299 300 yield chunk
300 301
301 302 repo.ui.progress(_('bundle'), None)
302 303
303 304 return requirements, gen()
304 305
305 306 def consumev1(repo, fp, filecount, bytecount):
306 307 """Apply the contents from version 1 of a streaming clone file handle.
307 308
308 309 This takes the output from "stream_out" and applies it to the specified
309 310 repository.
310 311
311 312 Like "stream_out," the status line added by the wire protocol is not
312 313 handled by this function.
313 314 """
314 315 with repo.lock():
315 316 repo.ui.status(_('%d files to transfer, %s of data\n') %
316 317 (filecount, util.bytecount(bytecount)))
317 318 handled_bytes = 0
318 319 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
319 320 start = util.timer()
320 321
321 322 # TODO: get rid of (potential) inconsistency
322 323 #
323 324 # If transaction is started and any @filecache property is
324 325 # changed at this point, it causes inconsistency between
325 326 # in-memory cached property and streamclone-ed file on the
326 327 # disk. Nested transaction prevents transaction scope "clone"
327 328 # below from writing in-memory changes out at the end of it,
328 329 # even though in-memory changes are discarded at the end of it
329 330 # regardless of transaction nesting.
330 331 #
331 332 # But transaction nesting can't be simply prohibited, because
332 333 # nesting occurs also in ordinary case (e.g. enabling
333 334 # clonebundles).
334 335
335 336 with repo.transaction('clone'):
336 337 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
337 338 for i in xrange(filecount):
338 339 # XXX doesn't support '\n' or '\r' in filenames
339 340 l = fp.readline()
340 341 try:
341 342 name, size = l.split('\0', 1)
342 343 size = int(size)
343 344 except (ValueError, TypeError):
344 345 raise error.ResponseError(
345 346 _('unexpected response from remote server:'), l)
346 347 if repo.ui.debugflag:
347 348 repo.ui.debug('adding %s (%s)\n' %
348 349 (name, util.bytecount(size)))
349 350 # for backwards compat, name was partially encoded
350 351 path = store.decodedir(name)
351 352 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
352 353 for chunk in util.filechunkiter(fp, limit=size):
353 354 handled_bytes += len(chunk)
354 355 repo.ui.progress(_('clone'), handled_bytes,
355 356 total=bytecount, unit=_('bytes'))
356 357 ofp.write(chunk)
357 358
358 359 # force @filecache properties to be reloaded from
359 360 # streamclone-ed file at next access
360 361 repo.invalidate(clearfilecache=True)
361 362
362 363 elapsed = util.timer() - start
363 364 if elapsed <= 0:
364 365 elapsed = 0.001
365 366 repo.ui.progress(_('clone'), None)
366 367 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
367 368 (util.bytecount(bytecount), elapsed,
368 369 util.bytecount(bytecount / elapsed)))
369 370
370 371 def readbundle1header(fp):
371 372 compression = fp.read(2)
372 373 if compression != 'UN':
373 374 raise error.Abort(_('only uncompressed stream clone bundles are '
374 375 'supported; got %s') % compression)
375 376
376 377 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
377 378 requireslen = struct.unpack('>H', fp.read(2))[0]
378 379 requires = fp.read(requireslen)
379 380
380 381 if not requires.endswith('\0'):
381 382 raise error.Abort(_('malformed stream clone bundle: '
382 383 'requirements not properly encoded'))
383 384
384 385 requirements = set(requires.rstrip('\0').split(','))
385 386
386 387 return filecount, bytecount, requirements
387 388
388 389 def applybundlev1(repo, fp):
389 390 """Apply the content from a stream clone bundle version 1.
390 391
391 392 We assume the 4 byte header has been read and validated and the file handle
392 393 is at the 2 byte compression identifier.
393 394 """
394 395 if len(repo):
395 396 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
396 397 'repo'))
397 398
398 399 filecount, bytecount, requirements = readbundle1header(fp)
399 400 missingreqs = requirements - repo.supportedformats
400 401 if missingreqs:
401 402 raise error.Abort(_('unable to apply stream clone: '
402 403 'unsupported format: %s') %
403 404 ', '.join(sorted(missingreqs)))
404 405
405 406 consumev1(repo, fp, filecount, bytecount)
406 407
407 408 class streamcloneapplier(object):
408 409 """Class to manage applying streaming clone bundles.
409 410
410 411 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
411 412 readers to perform bundle type-specific functionality.
412 413 """
413 414 def __init__(self, fh):
414 415 self._fh = fh
415 416
416 417 def apply(self, repo):
417 418 return applybundlev1(repo, self._fh)
@@ -1,641 +1,643 b''
1 1 # vfs.py - Mercurial 'vfs' classes
2 2 #
3 3 # Copyright Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import errno
11 11 import os
12 12 import shutil
13 13 import stat
14 14 import tempfile
15 15 import threading
16 16
17 17 from .i18n import _
18 18 from . import (
19 19 error,
20 20 pathutil,
21 21 pycompat,
22 22 util,
23 23 )
24 24
25 25 class abstractvfs(object):
26 26 """Abstract base class; cannot be instantiated"""
27 27
28 28 def __init__(self, *args, **kwargs):
29 29 '''Prevent instantiation; don't call this from subclasses.'''
30 30 raise NotImplementedError('attempted instantiating ' + str(type(self)))
31 31
32 32 def tryread(self, path):
33 33 '''gracefully return an empty string for missing files'''
34 34 try:
35 35 return self.read(path)
36 36 except IOError as inst:
37 37 if inst.errno != errno.ENOENT:
38 38 raise
39 39 return ""
40 40
41 41 def tryreadlines(self, path, mode='rb'):
42 42 '''gracefully return an empty array for missing files'''
43 43 try:
44 44 return self.readlines(path, mode=mode)
45 45 except IOError as inst:
46 46 if inst.errno != errno.ENOENT:
47 47 raise
48 48 return []
49 49
50 50 @util.propertycache
51 51 def open(self):
52 52 '''Open ``path`` file, which is relative to vfs root.
53 53
54 54 Newly created directories are marked as "not to be indexed by
55 55 the content indexing service", if ``notindexed`` is specified
56 56 for "write" mode access.
57 57 '''
58 58 return self.__call__
59 59
60 60 def read(self, path):
61 61 with self(path, 'rb') as fp:
62 62 return fp.read()
63 63
64 64 def readlines(self, path, mode='rb'):
65 65 with self(path, mode=mode) as fp:
66 66 return fp.readlines()
67 67
68 68 def write(self, path, data, backgroundclose=False):
69 69 with self(path, 'wb', backgroundclose=backgroundclose) as fp:
70 70 return fp.write(data)
71 71
72 72 def writelines(self, path, data, mode='wb', notindexed=False):
73 73 with self(path, mode=mode, notindexed=notindexed) as fp:
74 74 return fp.writelines(data)
75 75
76 76 def append(self, path, data):
77 77 with self(path, 'ab') as fp:
78 78 return fp.write(data)
79 79
80 80 def basename(self, path):
81 81 """return base element of a path (as os.path.basename would do)
82 82
83 83 This exists to allow handling of strange encoding if needed."""
84 84 return os.path.basename(path)
85 85
86 86 def chmod(self, path, mode):
87 87 return os.chmod(self.join(path), mode)
88 88
89 89 def dirname(self, path):
90 90 """return dirname element of a path (as os.path.dirname would do)
91 91
92 92 This exists to allow handling of strange encoding if needed."""
93 93 return os.path.dirname(path)
94 94
95 95 def exists(self, path=None):
96 96 return os.path.exists(self.join(path))
97 97
98 98 def fstat(self, fp):
99 99 return util.fstat(fp)
100 100
101 101 def isdir(self, path=None):
102 102 return os.path.isdir(self.join(path))
103 103
104 104 def isfile(self, path=None):
105 105 return os.path.isfile(self.join(path))
106 106
107 107 def islink(self, path=None):
108 108 return os.path.islink(self.join(path))
109 109
110 110 def isfileorlink(self, path=None):
111 111 '''return whether path is a regular file or a symlink
112 112
113 113 Unlike isfile, this doesn't follow symlinks.'''
114 114 try:
115 115 st = self.lstat(path)
116 116 except OSError:
117 117 return False
118 118 mode = st.st_mode
119 119 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
120 120
121 121 def reljoin(self, *paths):
122 122 """join various elements of a path together (as os.path.join would do)
123 123
124 124 The vfs base is not injected so that path stay relative. This exists
125 125 to allow handling of strange encoding if needed."""
126 126 return os.path.join(*paths)
127 127
128 128 def split(self, path):
129 129 """split top-most element of a path (as os.path.split would do)
130 130
131 131 This exists to allow handling of strange encoding if needed."""
132 132 return os.path.split(path)
133 133
134 134 def lexists(self, path=None):
135 135 return os.path.lexists(self.join(path))
136 136
137 137 def lstat(self, path=None):
138 138 return os.lstat(self.join(path))
139 139
140 140 def listdir(self, path=None):
141 141 return os.listdir(self.join(path))
142 142
143 143 def makedir(self, path=None, notindexed=True):
144 144 return util.makedir(self.join(path), notindexed)
145 145
146 146 def makedirs(self, path=None, mode=None):
147 147 return util.makedirs(self.join(path), mode)
148 148
149 149 def makelock(self, info, path):
150 150 return util.makelock(info, self.join(path))
151 151
152 152 def mkdir(self, path=None):
153 153 return os.mkdir(self.join(path))
154 154
155 155 def mkstemp(self, suffix='', prefix='tmp', dir=None, text=False):
156 156 fd, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
157 157 dir=self.join(dir), text=text)
158 158 dname, fname = util.split(name)
159 159 if dir:
160 160 return fd, os.path.join(dir, fname)
161 161 else:
162 162 return fd, fname
163 163
164 164 def readdir(self, path=None, stat=None, skip=None):
165 165 return util.listdir(self.join(path), stat, skip)
166 166
167 167 def readlock(self, path):
168 168 return util.readlock(self.join(path))
169 169
170 170 def rename(self, src, dst, checkambig=False):
171 171 """Rename from src to dst
172 172
173 173 checkambig argument is used with util.filestat, and is useful
174 174 only if destination file is guarded by any lock
175 175 (e.g. repo.lock or repo.wlock).
176 176 """
177 177 srcpath = self.join(src)
178 178 dstpath = self.join(dst)
179 179 oldstat = checkambig and util.filestat.frompath(dstpath)
180 180 if oldstat and oldstat.stat:
181 181 def dorename(spath, dpath):
182 182 ret = util.rename(spath, dpath)
183 183 newstat = util.filestat.frompath(dpath)
184 184 if newstat.isambig(oldstat):
185 185 # stat of renamed file is ambiguous to original one
186 186 return ret, newstat.avoidambig(dpath, oldstat)
187 187 return ret, True
188 188 ret, avoided = dorename(srcpath, dstpath)
189 189 if not avoided:
190 190 # simply copy to change owner of srcpath (see issue5418)
191 191 util.copyfile(dstpath, srcpath)
192 192 ret, avoided = dorename(srcpath, dstpath)
193 193 return ret
194 194 return util.rename(srcpath, dstpath)
195 195
196 196 def readlink(self, path):
197 197 return os.readlink(self.join(path))
198 198
199 199 def removedirs(self, path=None):
200 200 """Remove a leaf directory and all empty intermediate ones
201 201 """
202 202 return util.removedirs(self.join(path))
203 203
204 204 def rmtree(self, path=None, ignore_errors=False, forcibly=False):
205 205 """Remove a directory tree recursively
206 206
207 207 If ``forcibly``, this tries to remove READ-ONLY files, too.
208 208 """
209 209 if forcibly:
210 210 def onerror(function, path, excinfo):
211 211 if function is not os.remove:
212 212 raise
213 213 # read-only files cannot be unlinked under Windows
214 214 s = os.stat(path)
215 215 if (s.st_mode & stat.S_IWRITE) != 0:
216 216 raise
217 217 os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
218 218 os.remove(path)
219 219 else:
220 220 onerror = None
221 221 return shutil.rmtree(self.join(path),
222 222 ignore_errors=ignore_errors, onerror=onerror)
223 223
224 224 def setflags(self, path, l, x):
225 225 return util.setflags(self.join(path), l, x)
226 226
227 227 def stat(self, path=None):
228 228 return os.stat(self.join(path))
229 229
230 230 def unlink(self, path=None):
231 231 return util.unlink(self.join(path))
232 232
233 233 def tryunlink(self, path=None):
234 234 """Attempt to remove a file, ignoring missing file errors."""
235 235 util.tryunlink(self.join(path))
236 236
237 237 def unlinkpath(self, path=None, ignoremissing=False):
238 238 return util.unlinkpath(self.join(path), ignoremissing=ignoremissing)
239 239
240 240 def utime(self, path=None, t=None):
241 241 return os.utime(self.join(path), t)
242 242
243 243 def walk(self, path=None, onerror=None):
244 244 """Yield (dirpath, dirs, files) tuple for each directories under path
245 245
246 246 ``dirpath`` is relative one from the root of this vfs. This
247 247 uses ``os.sep`` as path separator, even you specify POSIX
248 248 style ``path``.
249 249
250 250 "The root of this vfs" is represented as empty ``dirpath``.
251 251 """
252 252 root = os.path.normpath(self.join(None))
253 253 # when dirpath == root, dirpath[prefixlen:] becomes empty
254 254 # because len(dirpath) < prefixlen.
255 255 prefixlen = len(pathutil.normasprefix(root))
256 256 for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror):
257 257 yield (dirpath[prefixlen:], dirs, files)
258 258
259 259 @contextlib.contextmanager
260 260 def backgroundclosing(self, ui, expectedcount=-1):
261 261 """Allow files to be closed asynchronously.
262 262
263 263 When this context manager is active, ``backgroundclose`` can be passed
264 264 to ``__call__``/``open`` to result in the file possibly being closed
265 265 asynchronously, on a background thread.
266 266 """
267 267 # This is an arbitrary restriction and could be changed if we ever
268 268 # have a use case.
269 269 vfs = getattr(self, 'vfs', self)
270 270 if getattr(vfs, '_backgroundfilecloser', None):
271 271 raise error.Abort(
272 272 _('can only have 1 active background file closer'))
273 273
274 274 with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc:
275 275 try:
276 276 vfs._backgroundfilecloser = bfc
277 277 yield bfc
278 278 finally:
279 279 vfs._backgroundfilecloser = None
280 280
281 281 class vfs(abstractvfs):
282 282 '''Operate files relative to a base directory
283 283
284 284 This class is used to hide the details of COW semantics and
285 285 remote file access from higher level code.
286 286 '''
287 287 def __init__(self, base, audit=True, expandpath=False, realpath=False):
288 288 if expandpath:
289 289 base = util.expandpath(base)
290 290 if realpath:
291 291 base = os.path.realpath(base)
292 292 self.base = base
293 293 self.mustaudit = audit
294 294 self.createmode = None
295 295 self._trustnlink = None
296 296
297 297 @property
298 298 def mustaudit(self):
299 299 return self._audit
300 300
301 301 @mustaudit.setter
302 302 def mustaudit(self, onoff):
303 303 self._audit = onoff
304 304 if onoff:
305 305 self.audit = pathutil.pathauditor(self.base)
306 306 else:
307 307 self.audit = util.always
308 308
309 309 @util.propertycache
310 310 def _cansymlink(self):
311 311 return util.checklink(self.base)
312 312
313 313 @util.propertycache
314 314 def _chmod(self):
315 315 return util.checkexec(self.base)
316 316
317 317 def _fixfilemode(self, name):
318 318 if self.createmode is None or not self._chmod:
319 319 return
320 320 os.chmod(name, self.createmode & 0o666)
321 321
322 322 def __call__(self, path, mode="r", text=False, atomictemp=False,
323 notindexed=False, backgroundclose=False, checkambig=False):
323 notindexed=False, backgroundclose=False, checkambig=False,
324 auditpath=True):
324 325 '''Open ``path`` file, which is relative to vfs root.
325 326
326 327 Newly created directories are marked as "not to be indexed by
327 328 the content indexing service", if ``notindexed`` is specified
328 329 for "write" mode access.
329 330
330 331 If ``backgroundclose`` is passed, the file may be closed asynchronously.
331 332 It can only be used if the ``self.backgroundclosing()`` context manager
332 333 is active. This should only be specified if the following criteria hold:
333 334
334 335 1. There is a potential for writing thousands of files. Unless you
335 336 are writing thousands of files, the performance benefits of
336 337 asynchronously closing files is not realized.
337 338 2. Files are opened exactly once for the ``backgroundclosing``
338 339 active duration and are therefore free of race conditions between
339 340 closing a file on a background thread and reopening it. (If the
340 341 file were opened multiple times, there could be unflushed data
341 342 because the original file handle hasn't been flushed/closed yet.)
342 343
343 344 ``checkambig`` argument is passed to atomictemplfile (valid
344 345 only for writing), and is useful only if target file is
345 346 guarded by any lock (e.g. repo.lock or repo.wlock).
346 347 '''
348 if auditpath:
347 349 if self._audit:
348 350 r = util.checkosfilename(path)
349 351 if r:
350 352 raise error.Abort("%s: %r" % (r, path))
351 353 self.audit(path)
352 354 f = self.join(path)
353 355
354 356 if not text and "b" not in mode:
355 357 mode += "b" # for that other OS
356 358
357 359 nlink = -1
358 360 if mode not in ('r', 'rb'):
359 361 dirname, basename = util.split(f)
360 362 # If basename is empty, then the path is malformed because it points
361 363 # to a directory. Let the posixfile() call below raise IOError.
362 364 if basename:
363 365 if atomictemp:
364 366 util.makedirs(dirname, self.createmode, notindexed)
365 367 return util.atomictempfile(f, mode, self.createmode,
366 368 checkambig=checkambig)
367 369 try:
368 370 if 'w' in mode:
369 371 util.unlink(f)
370 372 nlink = 0
371 373 else:
372 374 # nlinks() may behave differently for files on Windows
373 375 # shares if the file is open.
374 376 with util.posixfile(f):
375 377 nlink = util.nlinks(f)
376 378 if nlink < 1:
377 379 nlink = 2 # force mktempcopy (issue1922)
378 380 except (OSError, IOError) as e:
379 381 if e.errno != errno.ENOENT:
380 382 raise
381 383 nlink = 0
382 384 util.makedirs(dirname, self.createmode, notindexed)
383 385 if nlink > 0:
384 386 if self._trustnlink is None:
385 387 self._trustnlink = nlink > 1 or util.checknlink(f)
386 388 if nlink > 1 or not self._trustnlink:
387 389 util.rename(util.mktempcopy(f), f)
388 390 fp = util.posixfile(f, mode)
389 391 if nlink == 0:
390 392 self._fixfilemode(f)
391 393
392 394 if checkambig:
393 395 if mode in ('r', 'rb'):
394 396 raise error.Abort(_('implementation error: mode %s is not'
395 397 ' valid for checkambig=True') % mode)
396 398 fp = checkambigatclosing(fp)
397 399
398 400 if backgroundclose:
399 401 if not self._backgroundfilecloser:
400 402 raise error.Abort(_('backgroundclose can only be used when a '
401 403 'backgroundclosing context manager is active')
402 404 )
403 405
404 406 fp = delayclosedfile(fp, self._backgroundfilecloser)
405 407
406 408 return fp
407 409
408 410 def symlink(self, src, dst):
409 411 self.audit(dst)
410 412 linkname = self.join(dst)
411 413 util.tryunlink(linkname)
412 414
413 415 util.makedirs(os.path.dirname(linkname), self.createmode)
414 416
415 417 if self._cansymlink:
416 418 try:
417 419 os.symlink(src, linkname)
418 420 except OSError as err:
419 421 raise OSError(err.errno, _('could not symlink to %r: %s') %
420 422 (src, err.strerror), linkname)
421 423 else:
422 424 self.write(dst, src)
423 425
424 426 def join(self, path, *insidef):
425 427 if path:
426 428 return os.path.join(self.base, path, *insidef)
427 429 else:
428 430 return self.base
429 431
430 432 opener = vfs
431 433
432 434 class auditvfs(object):
433 435 def __init__(self, vfs):
434 436 self.vfs = vfs
435 437
436 438 @property
437 439 def mustaudit(self):
438 440 return self.vfs.mustaudit
439 441
440 442 @mustaudit.setter
441 443 def mustaudit(self, onoff):
442 444 self.vfs.mustaudit = onoff
443 445
444 446 @property
445 447 def options(self):
446 448 return self.vfs.options
447 449
448 450 @options.setter
449 451 def options(self, value):
450 452 self.vfs.options = value
451 453
452 454 class filtervfs(abstractvfs, auditvfs):
453 455 '''Wrapper vfs for filtering filenames with a function.'''
454 456
455 457 def __init__(self, vfs, filter):
456 458 auditvfs.__init__(self, vfs)
457 459 self._filter = filter
458 460
459 461 def __call__(self, path, *args, **kwargs):
460 462 return self.vfs(self._filter(path), *args, **kwargs)
461 463
462 464 def join(self, path, *insidef):
463 465 if path:
464 466 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
465 467 else:
466 468 return self.vfs.join(path)
467 469
468 470 filteropener = filtervfs
469 471
470 472 class readonlyvfs(abstractvfs, auditvfs):
471 473 '''Wrapper vfs preventing any writing.'''
472 474
473 475 def __init__(self, vfs):
474 476 auditvfs.__init__(self, vfs)
475 477
476 478 def __call__(self, path, mode='r', *args, **kw):
477 479 if mode not in ('r', 'rb'):
478 480 raise error.Abort(_('this vfs is read only'))
479 481 return self.vfs(path, mode, *args, **kw)
480 482
481 483 def join(self, path, *insidef):
482 484 return self.vfs.join(path, *insidef)
483 485
484 486 class closewrapbase(object):
485 487 """Base class of wrapper, which hooks closing
486 488
487 489 Do not instantiate outside of the vfs layer.
488 490 """
489 491 def __init__(self, fh):
490 492 object.__setattr__(self, r'_origfh', fh)
491 493
492 494 def __getattr__(self, attr):
493 495 return getattr(self._origfh, attr)
494 496
495 497 def __setattr__(self, attr, value):
496 498 return setattr(self._origfh, attr, value)
497 499
498 500 def __delattr__(self, attr):
499 501 return delattr(self._origfh, attr)
500 502
501 503 def __enter__(self):
502 504 return self._origfh.__enter__()
503 505
504 506 def __exit__(self, exc_type, exc_value, exc_tb):
505 507 raise NotImplementedError('attempted instantiating ' + str(type(self)))
506 508
507 509 def close(self):
508 510 raise NotImplementedError('attempted instantiating ' + str(type(self)))
509 511
510 512 class delayclosedfile(closewrapbase):
511 513 """Proxy for a file object whose close is delayed.
512 514
513 515 Do not instantiate outside of the vfs layer.
514 516 """
515 517 def __init__(self, fh, closer):
516 518 super(delayclosedfile, self).__init__(fh)
517 519 object.__setattr__(self, r'_closer', closer)
518 520
519 521 def __exit__(self, exc_type, exc_value, exc_tb):
520 522 self._closer.close(self._origfh)
521 523
522 524 def close(self):
523 525 self._closer.close(self._origfh)
524 526
525 527 class backgroundfilecloser(object):
526 528 """Coordinates background closing of file handles on multiple threads."""
527 529 def __init__(self, ui, expectedcount=-1):
528 530 self._running = False
529 531 self._entered = False
530 532 self._threads = []
531 533 self._threadexception = None
532 534
533 535 # Only Windows/NTFS has slow file closing. So only enable by default
534 536 # on that platform. But allow to be enabled elsewhere for testing.
535 537 defaultenabled = pycompat.osname == 'nt'
536 538 enabled = ui.configbool('worker', 'backgroundclose', defaultenabled)
537 539
538 540 if not enabled:
539 541 return
540 542
541 543 # There is overhead to starting and stopping the background threads.
542 544 # Don't do background processing unless the file count is large enough
543 545 # to justify it.
544 546 minfilecount = ui.configint('worker', 'backgroundcloseminfilecount')
545 547 # FUTURE dynamically start background threads after minfilecount closes.
546 548 # (We don't currently have any callers that don't know their file count)
547 549 if expectedcount > 0 and expectedcount < minfilecount:
548 550 return
549 551
550 552 maxqueue = ui.configint('worker', 'backgroundclosemaxqueue')
551 553 threadcount = ui.configint('worker', 'backgroundclosethreadcount')
552 554
553 555 ui.debug('starting %d threads for background file closing\n' %
554 556 threadcount)
555 557
556 558 self._queue = util.queue(maxsize=maxqueue)
557 559 self._running = True
558 560
559 561 for i in range(threadcount):
560 562 t = threading.Thread(target=self._worker, name='backgroundcloser')
561 563 self._threads.append(t)
562 564 t.start()
563 565
564 566 def __enter__(self):
565 567 self._entered = True
566 568 return self
567 569
568 570 def __exit__(self, exc_type, exc_value, exc_tb):
569 571 self._running = False
570 572
571 573 # Wait for threads to finish closing so open files don't linger for
572 574 # longer than lifetime of context manager.
573 575 for t in self._threads:
574 576 t.join()
575 577
576 578 def _worker(self):
577 579 """Main routine for worker thread."""
578 580 while True:
579 581 try:
580 582 fh = self._queue.get(block=True, timeout=0.100)
581 583 # Need to catch or the thread will terminate and
582 584 # we could orphan file descriptors.
583 585 try:
584 586 fh.close()
585 587 except Exception as e:
586 588 # Stash so can re-raise from main thread later.
587 589 self._threadexception = e
588 590 except util.empty:
589 591 if not self._running:
590 592 break
591 593
592 594 def close(self, fh):
593 595 """Schedule a file for closing."""
594 596 if not self._entered:
595 597 raise error.Abort(_('can only call close() when context manager '
596 598 'active'))
597 599
598 600 # If a background thread encountered an exception, raise now so we fail
599 601 # fast. Otherwise we may potentially go on for minutes until the error
600 602 # is acted on.
601 603 if self._threadexception:
602 604 e = self._threadexception
603 605 self._threadexception = None
604 606 raise e
605 607
606 608 # If we're not actively running, close synchronously.
607 609 if not self._running:
608 610 fh.close()
609 611 return
610 612
611 613 self._queue.put(fh, block=True, timeout=None)
612 614
613 615 class checkambigatclosing(closewrapbase):
614 616 """Proxy for a file object, to avoid ambiguity of file stat
615 617
616 618 See also util.filestat for detail about "ambiguity of file stat".
617 619
618 620 This proxy is useful only if the target file is guarded by any
619 621 lock (e.g. repo.lock or repo.wlock)
620 622
621 623 Do not instantiate outside of the vfs layer.
622 624 """
623 625 def __init__(self, fh):
624 626 super(checkambigatclosing, self).__init__(fh)
625 627 object.__setattr__(self, r'_oldstat', util.filestat.frompath(fh.name))
626 628
627 629 def _checkambig(self):
628 630 oldstat = self._oldstat
629 631 if oldstat.stat:
630 632 newstat = util.filestat.frompath(self._origfh.name)
631 633 if newstat.isambig(oldstat):
632 634 # stat of changed file is ambiguous to original one
633 635 newstat.avoidambig(self._origfh.name, oldstat)
634 636
635 637 def __exit__(self, exc_type, exc_value, exc_tb):
636 638 self._origfh.__exit__(exc_type, exc_value, exc_tb)
637 639 self._checkambig()
638 640
639 641 def close(self):
640 642 self._origfh.close()
641 643 self._checkambig()
General Comments 0
You need to be logged in to leave comments. Login now