##// END OF EJS Templates
vfs: simplify path audit disabling in stream clone...
marmoute -
r33255:15e9cbe6 default
parent child Browse files
Show More
@@ -1,417 +1,418 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import struct
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 phases,
16 phases,
17 store,
17 store,
18 util,
18 util,
19 )
19 )
20
20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
23
23
24 ``bailifbundle2supported`` will cause the function to return False if
24 ``bailifbundle2supported`` will cause the function to return False if
25 bundle2 stream clones are supported. It should only be called by the
25 bundle2 stream clones are supported. It should only be called by the
26 legacy stream clone code path.
26 legacy stream clone code path.
27
27
28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 streaming clone is supported and False otherwise. ``requirements`` is
29 streaming clone is supported and False otherwise. ``requirements`` is
30 a set of repo requirements from the remote, or ``None`` if stream clone
30 a set of repo requirements from the remote, or ``None`` if stream clone
31 isn't supported.
31 isn't supported.
32 """
32 """
33 repo = pullop.repo
33 repo = pullop.repo
34 remote = pullop.remote
34 remote = pullop.remote
35
35
36 bundle2supported = False
36 bundle2supported = False
37 if pullop.canusebundle2:
37 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 bundle2supported = True
39 bundle2supported = True
40 # else
40 # else
41 # Server doesn't support bundle2 stream clone or doesn't support
41 # Server doesn't support bundle2 stream clone or doesn't support
42 # the versions we support. Fall back and possibly allow legacy.
42 # the versions we support. Fall back and possibly allow legacy.
43
43
44 # Ensures legacy code path uses available bundle2.
44 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
45 if bailifbundle2supported and bundle2supported:
46 return False, None
46 return False, None
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
48 #elif not bailifbundle2supported and not bundle2supported:
49 # return False, None
49 # return False, None
50
50
51 # Streaming clone only works on empty repositories.
51 # Streaming clone only works on empty repositories.
52 if len(repo):
52 if len(repo):
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works if all data is being requested.
55 # Streaming clone only works if all data is being requested.
56 if pullop.heads:
56 if pullop.heads:
57 return False, None
57 return False, None
58
58
59 streamrequested = pullop.streamclonerequested
59 streamrequested = pullop.streamclonerequested
60
60
61 # If we don't have a preference, let the server decide for us. This
61 # If we don't have a preference, let the server decide for us. This
62 # likely only comes into play in LANs.
62 # likely only comes into play in LANs.
63 if streamrequested is None:
63 if streamrequested is None:
64 # The server can advertise whether to prefer streaming clone.
64 # The server can advertise whether to prefer streaming clone.
65 streamrequested = remote.capable('stream-preferred')
65 streamrequested = remote.capable('stream-preferred')
66
66
67 if not streamrequested:
67 if not streamrequested:
68 return False, None
68 return False, None
69
69
70 # In order for stream clone to work, the client has to support all the
70 # In order for stream clone to work, the client has to support all the
71 # requirements advertised by the server.
71 # requirements advertised by the server.
72 #
72 #
73 # The server advertises its requirements via the "stream" and "streamreqs"
73 # The server advertises its requirements via the "stream" and "streamreqs"
74 # capability. "stream" (a value-less capability) is advertised if and only
74 # capability. "stream" (a value-less capability) is advertised if and only
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 # is advertised and contains a comma-delimited list of requirements.
76 # is advertised and contains a comma-delimited list of requirements.
77 requirements = set()
77 requirements = set()
78 if remote.capable('stream'):
78 if remote.capable('stream'):
79 requirements.add('revlogv1')
79 requirements.add('revlogv1')
80 else:
80 else:
81 streamreqs = remote.capable('streamreqs')
81 streamreqs = remote.capable('streamreqs')
82 # This is weird and shouldn't happen with modern servers.
82 # This is weird and shouldn't happen with modern servers.
83 if not streamreqs:
83 if not streamreqs:
84 pullop.repo.ui.warn(_(
84 pullop.repo.ui.warn(_(
85 'warning: stream clone requested but server has them '
85 'warning: stream clone requested but server has them '
86 'disabled\n'))
86 'disabled\n'))
87 return False, None
87 return False, None
88
88
89 streamreqs = set(streamreqs.split(','))
89 streamreqs = set(streamreqs.split(','))
90 # Server requires something we don't support. Bail.
90 # Server requires something we don't support. Bail.
91 missingreqs = streamreqs - repo.supportedformats
91 missingreqs = streamreqs - repo.supportedformats
92 if missingreqs:
92 if missingreqs:
93 pullop.repo.ui.warn(_(
93 pullop.repo.ui.warn(_(
94 'warning: stream clone requested but client is missing '
94 'warning: stream clone requested but client is missing '
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 pullop.repo.ui.warn(
96 pullop.repo.ui.warn(
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 'for more information)\n'))
98 'for more information)\n'))
99 return False, None
99 return False, None
100 requirements = streamreqs
100 requirements = streamreqs
101
101
102 return True, requirements
102 return True, requirements
103
103
104 def maybeperformlegacystreamclone(pullop):
104 def maybeperformlegacystreamclone(pullop):
105 """Possibly perform a legacy stream clone operation.
105 """Possibly perform a legacy stream clone operation.
106
106
107 Legacy stream clones are performed as part of pull but before all other
107 Legacy stream clones are performed as part of pull but before all other
108 operations.
108 operations.
109
109
110 A legacy stream clone will not be performed if a bundle2 stream clone is
110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 supported.
111 supported.
112 """
112 """
113 supported, requirements = canperformstreamclone(pullop)
113 supported, requirements = canperformstreamclone(pullop)
114
114
115 if not supported:
115 if not supported:
116 return
116 return
117
117
118 repo = pullop.repo
118 repo = pullop.repo
119 remote = pullop.remote
119 remote = pullop.remote
120
120
121 # Save remote branchmap. We will use it later to speed up branchcache
121 # Save remote branchmap. We will use it later to speed up branchcache
122 # creation.
122 # creation.
123 rbranchmap = None
123 rbranchmap = None
124 if remote.capable('branchmap'):
124 if remote.capable('branchmap'):
125 rbranchmap = remote.branchmap()
125 rbranchmap = remote.branchmap()
126
126
127 repo.ui.status(_('streaming all changes\n'))
127 repo.ui.status(_('streaming all changes\n'))
128
128
129 fp = remote.stream_out()
129 fp = remote.stream_out()
130 l = fp.readline()
130 l = fp.readline()
131 try:
131 try:
132 resp = int(l)
132 resp = int(l)
133 except ValueError:
133 except ValueError:
134 raise error.ResponseError(
134 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
135 _('unexpected response from remote server:'), l)
136 if resp == 1:
136 if resp == 1:
137 raise error.Abort(_('operation forbidden by server'))
137 raise error.Abort(_('operation forbidden by server'))
138 elif resp == 2:
138 elif resp == 2:
139 raise error.Abort(_('locking the remote repository failed'))
139 raise error.Abort(_('locking the remote repository failed'))
140 elif resp != 0:
140 elif resp != 0:
141 raise error.Abort(_('the server sent an unknown error code'))
141 raise error.Abort(_('the server sent an unknown error code'))
142
142
143 l = fp.readline()
143 l = fp.readline()
144 try:
144 try:
145 filecount, bytecount = map(int, l.split(' ', 1))
145 filecount, bytecount = map(int, l.split(' ', 1))
146 except (ValueError, TypeError):
146 except (ValueError, TypeError):
147 raise error.ResponseError(
147 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
148 _('unexpected response from remote server:'), l)
149
149
150 with repo.lock():
150 with repo.lock():
151 consumev1(repo, fp, filecount, bytecount)
151 consumev1(repo, fp, filecount, bytecount)
152
152
153 # new requirements = old non-format requirements +
153 # new requirements = old non-format requirements +
154 # new format-related remote requirements
154 # new format-related remote requirements
155 # requirements from the streamed-in repository
155 # requirements from the streamed-in repository
156 repo.requirements = requirements | (
156 repo.requirements = requirements | (
157 repo.requirements - repo.supportedformats)
157 repo.requirements - repo.supportedformats)
158 repo._applyopenerreqs()
158 repo._applyopenerreqs()
159 repo._writerequirements()
159 repo._writerequirements()
160
160
161 if rbranchmap:
161 if rbranchmap:
162 branchmap.replacecache(repo, rbranchmap)
162 branchmap.replacecache(repo, rbranchmap)
163
163
164 repo.invalidate()
164 repo.invalidate()
165
165
166 def allowservergeneration(repo):
166 def allowservergeneration(repo):
167 """Whether streaming clones are allowed from the server."""
167 """Whether streaming clones are allowed from the server."""
168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
169 return False
169 return False
170
170
171 # The way stream clone works makes it impossible to hide secret changesets.
171 # The way stream clone works makes it impossible to hide secret changesets.
172 # So don't allow this by default.
172 # So don't allow this by default.
173 secret = phases.hassecret(repo)
173 secret = phases.hassecret(repo)
174 if secret:
174 if secret:
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176
176
177 return True
177 return True
178
178
179 # This is it's own function so extensions can override it.
179 # This is it's own function so extensions can override it.
180 def _walkstreamfiles(repo):
180 def _walkstreamfiles(repo):
181 return repo.store.walk()
181 return repo.store.walk()
182
182
183 def generatev1(repo):
183 def generatev1(repo):
184 """Emit content for version 1 of a streaming clone.
184 """Emit content for version 1 of a streaming clone.
185
185
186 This returns a 3-tuple of (file count, byte size, data iterator).
186 This returns a 3-tuple of (file count, byte size, data iterator).
187
187
188 The data iterator consists of N entries for each file being transferred.
188 The data iterator consists of N entries for each file being transferred.
189 Each file entry starts as a line with the file name and integer size
189 Each file entry starts as a line with the file name and integer size
190 delimited by a null byte.
190 delimited by a null byte.
191
191
192 The raw file data follows. Following the raw file data is the next file
192 The raw file data follows. Following the raw file data is the next file
193 entry, or EOF.
193 entry, or EOF.
194
194
195 When used on the wire protocol, an additional line indicating protocol
195 When used on the wire protocol, an additional line indicating protocol
196 success will be prepended to the stream. This function is not responsible
196 success will be prepended to the stream. This function is not responsible
197 for adding it.
197 for adding it.
198
198
199 This function will obtain a repository lock to ensure a consistent view of
199 This function will obtain a repository lock to ensure a consistent view of
200 the store is captured. It therefore may raise LockError.
200 the store is captured. It therefore may raise LockError.
201 """
201 """
202 entries = []
202 entries = []
203 total_bytes = 0
203 total_bytes = 0
204 # Get consistent snapshot of repo, lock during scan.
204 # Get consistent snapshot of repo, lock during scan.
205 with repo.lock():
205 with repo.lock():
206 repo.ui.debug('scanning\n')
206 repo.ui.debug('scanning\n')
207 for name, ename, size in _walkstreamfiles(repo):
207 for name, ename, size in _walkstreamfiles(repo):
208 if size:
208 if size:
209 entries.append((name, size))
209 entries.append((name, size))
210 total_bytes += size
210 total_bytes += size
211
211
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 (len(entries), total_bytes))
213 (len(entries), total_bytes))
214
214
215 svfs = repo.svfs
215 svfs = repo.svfs
216 oldaudit = svfs.mustaudit
216 oldaudit = svfs.mustaudit
217 debugflag = repo.ui.debugflag
217 debugflag = repo.ui.debugflag
218 svfs.mustaudit = False
218 svfs.mustaudit = False
219
219
220 def emitrevlogdata():
220 def emitrevlogdata():
221 try:
221 try:
222 for name, size in entries:
222 for name, size in entries:
223 if debugflag:
223 if debugflag:
224 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
224 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
225 # partially encode name over the wire for backwards compat
225 # partially encode name over the wire for backwards compat
226 yield '%s\0%d\n' % (store.encodedir(name), size)
226 yield '%s\0%d\n' % (store.encodedir(name), size)
227 if size <= 65536:
227 if size <= 65536:
228 with svfs(name, 'rb') as fp:
228 with svfs(name, 'rb', auditpath=False) as fp:
229 yield fp.read(size)
229 yield fp.read(size)
230 else:
230 else:
231 for chunk in util.filechunkiter(svfs(name), limit=size):
231 data = svfs(name, auditpath=False)
232 for chunk in util.filechunkiter(data, limit=size):
232 yield chunk
233 yield chunk
233 finally:
234 finally:
234 svfs.mustaudit = oldaudit
235 svfs.mustaudit = oldaudit
235
236
236 return len(entries), total_bytes, emitrevlogdata()
237 return len(entries), total_bytes, emitrevlogdata()
237
238
238 def generatev1wireproto(repo):
239 def generatev1wireproto(repo):
239 """Emit content for version 1 of streaming clone suitable for the wire.
240 """Emit content for version 1 of streaming clone suitable for the wire.
240
241
241 This is the data output from ``generatev1()`` with a header line
242 This is the data output from ``generatev1()`` with a header line
242 indicating file count and byte size.
243 indicating file count and byte size.
243 """
244 """
244 filecount, bytecount, it = generatev1(repo)
245 filecount, bytecount, it = generatev1(repo)
245 yield '%d %d\n' % (filecount, bytecount)
246 yield '%d %d\n' % (filecount, bytecount)
246 for chunk in it:
247 for chunk in it:
247 yield chunk
248 yield chunk
248
249
249 def generatebundlev1(repo, compression='UN'):
250 def generatebundlev1(repo, compression='UN'):
250 """Emit content for version 1 of a stream clone bundle.
251 """Emit content for version 1 of a stream clone bundle.
251
252
252 The first 4 bytes of the output ("HGS1") denote this as stream clone
253 The first 4 bytes of the output ("HGS1") denote this as stream clone
253 bundle version 1.
254 bundle version 1.
254
255
255 The next 2 bytes indicate the compression type. Only "UN" is currently
256 The next 2 bytes indicate the compression type. Only "UN" is currently
256 supported.
257 supported.
257
258
258 The next 16 bytes are two 64-bit big endian unsigned integers indicating
259 The next 16 bytes are two 64-bit big endian unsigned integers indicating
259 file count and byte count, respectively.
260 file count and byte count, respectively.
260
261
261 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
262 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
262 of the requirements string, including a trailing \0. The following N bytes
263 of the requirements string, including a trailing \0. The following N bytes
263 are the requirements string, which is ASCII containing a comma-delimited
264 are the requirements string, which is ASCII containing a comma-delimited
264 list of repo requirements that are needed to support the data.
265 list of repo requirements that are needed to support the data.
265
266
266 The remaining content is the output of ``generatev1()`` (which may be
267 The remaining content is the output of ``generatev1()`` (which may be
267 compressed in the future).
268 compressed in the future).
268
269
269 Returns a tuple of (requirements, data generator).
270 Returns a tuple of (requirements, data generator).
270 """
271 """
271 if compression != 'UN':
272 if compression != 'UN':
272 raise ValueError('we do not support the compression argument yet')
273 raise ValueError('we do not support the compression argument yet')
273
274
274 requirements = repo.requirements & repo.supportedformats
275 requirements = repo.requirements & repo.supportedformats
275 requires = ','.join(sorted(requirements))
276 requires = ','.join(sorted(requirements))
276
277
277 def gen():
278 def gen():
278 yield 'HGS1'
279 yield 'HGS1'
279 yield compression
280 yield compression
280
281
281 filecount, bytecount, it = generatev1(repo)
282 filecount, bytecount, it = generatev1(repo)
282 repo.ui.status(_('writing %d bytes for %d files\n') %
283 repo.ui.status(_('writing %d bytes for %d files\n') %
283 (bytecount, filecount))
284 (bytecount, filecount))
284
285
285 yield struct.pack('>QQ', filecount, bytecount)
286 yield struct.pack('>QQ', filecount, bytecount)
286 yield struct.pack('>H', len(requires) + 1)
287 yield struct.pack('>H', len(requires) + 1)
287 yield requires + '\0'
288 yield requires + '\0'
288
289
289 # This is where we'll add compression in the future.
290 # This is where we'll add compression in the future.
290 assert compression == 'UN'
291 assert compression == 'UN'
291
292
292 seen = 0
293 seen = 0
293 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
294 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
294
295
295 for chunk in it:
296 for chunk in it:
296 seen += len(chunk)
297 seen += len(chunk)
297 repo.ui.progress(_('bundle'), seen, total=bytecount,
298 repo.ui.progress(_('bundle'), seen, total=bytecount,
298 unit=_('bytes'))
299 unit=_('bytes'))
299 yield chunk
300 yield chunk
300
301
301 repo.ui.progress(_('bundle'), None)
302 repo.ui.progress(_('bundle'), None)
302
303
303 return requirements, gen()
304 return requirements, gen()
304
305
305 def consumev1(repo, fp, filecount, bytecount):
306 def consumev1(repo, fp, filecount, bytecount):
306 """Apply the contents from version 1 of a streaming clone file handle.
307 """Apply the contents from version 1 of a streaming clone file handle.
307
308
308 This takes the output from "stream_out" and applies it to the specified
309 This takes the output from "stream_out" and applies it to the specified
309 repository.
310 repository.
310
311
311 Like "stream_out," the status line added by the wire protocol is not
312 Like "stream_out," the status line added by the wire protocol is not
312 handled by this function.
313 handled by this function.
313 """
314 """
314 with repo.lock():
315 with repo.lock():
315 repo.ui.status(_('%d files to transfer, %s of data\n') %
316 repo.ui.status(_('%d files to transfer, %s of data\n') %
316 (filecount, util.bytecount(bytecount)))
317 (filecount, util.bytecount(bytecount)))
317 handled_bytes = 0
318 handled_bytes = 0
318 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
319 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
319 start = util.timer()
320 start = util.timer()
320
321
321 # TODO: get rid of (potential) inconsistency
322 # TODO: get rid of (potential) inconsistency
322 #
323 #
323 # If transaction is started and any @filecache property is
324 # If transaction is started and any @filecache property is
324 # changed at this point, it causes inconsistency between
325 # changed at this point, it causes inconsistency between
325 # in-memory cached property and streamclone-ed file on the
326 # in-memory cached property and streamclone-ed file on the
326 # disk. Nested transaction prevents transaction scope "clone"
327 # disk. Nested transaction prevents transaction scope "clone"
327 # below from writing in-memory changes out at the end of it,
328 # below from writing in-memory changes out at the end of it,
328 # even though in-memory changes are discarded at the end of it
329 # even though in-memory changes are discarded at the end of it
329 # regardless of transaction nesting.
330 # regardless of transaction nesting.
330 #
331 #
331 # But transaction nesting can't be simply prohibited, because
332 # But transaction nesting can't be simply prohibited, because
332 # nesting occurs also in ordinary case (e.g. enabling
333 # nesting occurs also in ordinary case (e.g. enabling
333 # clonebundles).
334 # clonebundles).
334
335
335 with repo.transaction('clone'):
336 with repo.transaction('clone'):
336 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
337 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
337 for i in xrange(filecount):
338 for i in xrange(filecount):
338 # XXX doesn't support '\n' or '\r' in filenames
339 # XXX doesn't support '\n' or '\r' in filenames
339 l = fp.readline()
340 l = fp.readline()
340 try:
341 try:
341 name, size = l.split('\0', 1)
342 name, size = l.split('\0', 1)
342 size = int(size)
343 size = int(size)
343 except (ValueError, TypeError):
344 except (ValueError, TypeError):
344 raise error.ResponseError(
345 raise error.ResponseError(
345 _('unexpected response from remote server:'), l)
346 _('unexpected response from remote server:'), l)
346 if repo.ui.debugflag:
347 if repo.ui.debugflag:
347 repo.ui.debug('adding %s (%s)\n' %
348 repo.ui.debug('adding %s (%s)\n' %
348 (name, util.bytecount(size)))
349 (name, util.bytecount(size)))
349 # for backwards compat, name was partially encoded
350 # for backwards compat, name was partially encoded
350 path = store.decodedir(name)
351 path = store.decodedir(name)
351 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
352 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
352 for chunk in util.filechunkiter(fp, limit=size):
353 for chunk in util.filechunkiter(fp, limit=size):
353 handled_bytes += len(chunk)
354 handled_bytes += len(chunk)
354 repo.ui.progress(_('clone'), handled_bytes,
355 repo.ui.progress(_('clone'), handled_bytes,
355 total=bytecount, unit=_('bytes'))
356 total=bytecount, unit=_('bytes'))
356 ofp.write(chunk)
357 ofp.write(chunk)
357
358
358 # force @filecache properties to be reloaded from
359 # force @filecache properties to be reloaded from
359 # streamclone-ed file at next access
360 # streamclone-ed file at next access
360 repo.invalidate(clearfilecache=True)
361 repo.invalidate(clearfilecache=True)
361
362
362 elapsed = util.timer() - start
363 elapsed = util.timer() - start
363 if elapsed <= 0:
364 if elapsed <= 0:
364 elapsed = 0.001
365 elapsed = 0.001
365 repo.ui.progress(_('clone'), None)
366 repo.ui.progress(_('clone'), None)
366 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
367 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
367 (util.bytecount(bytecount), elapsed,
368 (util.bytecount(bytecount), elapsed,
368 util.bytecount(bytecount / elapsed)))
369 util.bytecount(bytecount / elapsed)))
369
370
370 def readbundle1header(fp):
371 def readbundle1header(fp):
371 compression = fp.read(2)
372 compression = fp.read(2)
372 if compression != 'UN':
373 if compression != 'UN':
373 raise error.Abort(_('only uncompressed stream clone bundles are '
374 raise error.Abort(_('only uncompressed stream clone bundles are '
374 'supported; got %s') % compression)
375 'supported; got %s') % compression)
375
376
376 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
377 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
377 requireslen = struct.unpack('>H', fp.read(2))[0]
378 requireslen = struct.unpack('>H', fp.read(2))[0]
378 requires = fp.read(requireslen)
379 requires = fp.read(requireslen)
379
380
380 if not requires.endswith('\0'):
381 if not requires.endswith('\0'):
381 raise error.Abort(_('malformed stream clone bundle: '
382 raise error.Abort(_('malformed stream clone bundle: '
382 'requirements not properly encoded'))
383 'requirements not properly encoded'))
383
384
384 requirements = set(requires.rstrip('\0').split(','))
385 requirements = set(requires.rstrip('\0').split(','))
385
386
386 return filecount, bytecount, requirements
387 return filecount, bytecount, requirements
387
388
388 def applybundlev1(repo, fp):
389 def applybundlev1(repo, fp):
389 """Apply the content from a stream clone bundle version 1.
390 """Apply the content from a stream clone bundle version 1.
390
391
391 We assume the 4 byte header has been read and validated and the file handle
392 We assume the 4 byte header has been read and validated and the file handle
392 is at the 2 byte compression identifier.
393 is at the 2 byte compression identifier.
393 """
394 """
394 if len(repo):
395 if len(repo):
395 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
396 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
396 'repo'))
397 'repo'))
397
398
398 filecount, bytecount, requirements = readbundle1header(fp)
399 filecount, bytecount, requirements = readbundle1header(fp)
399 missingreqs = requirements - repo.supportedformats
400 missingreqs = requirements - repo.supportedformats
400 if missingreqs:
401 if missingreqs:
401 raise error.Abort(_('unable to apply stream clone: '
402 raise error.Abort(_('unable to apply stream clone: '
402 'unsupported format: %s') %
403 'unsupported format: %s') %
403 ', '.join(sorted(missingreqs)))
404 ', '.join(sorted(missingreqs)))
404
405
405 consumev1(repo, fp, filecount, bytecount)
406 consumev1(repo, fp, filecount, bytecount)
406
407
407 class streamcloneapplier(object):
408 class streamcloneapplier(object):
408 """Class to manage applying streaming clone bundles.
409 """Class to manage applying streaming clone bundles.
409
410
410 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
411 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
411 readers to perform bundle type-specific functionality.
412 readers to perform bundle type-specific functionality.
412 """
413 """
413 def __init__(self, fh):
414 def __init__(self, fh):
414 self._fh = fh
415 self._fh = fh
415
416
416 def apply(self, repo):
417 def apply(self, repo):
417 return applybundlev1(repo, self._fh)
418 return applybundlev1(repo, self._fh)
@@ -1,641 +1,643 b''
1 # vfs.py - Mercurial 'vfs' classes
1 # vfs.py - Mercurial 'vfs' classes
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import errno
10 import errno
11 import os
11 import os
12 import shutil
12 import shutil
13 import stat
13 import stat
14 import tempfile
14 import tempfile
15 import threading
15 import threading
16
16
17 from .i18n import _
17 from .i18n import _
18 from . import (
18 from . import (
19 error,
19 error,
20 pathutil,
20 pathutil,
21 pycompat,
21 pycompat,
22 util,
22 util,
23 )
23 )
24
24
25 class abstractvfs(object):
25 class abstractvfs(object):
26 """Abstract base class; cannot be instantiated"""
26 """Abstract base class; cannot be instantiated"""
27
27
28 def __init__(self, *args, **kwargs):
28 def __init__(self, *args, **kwargs):
29 '''Prevent instantiation; don't call this from subclasses.'''
29 '''Prevent instantiation; don't call this from subclasses.'''
30 raise NotImplementedError('attempted instantiating ' + str(type(self)))
30 raise NotImplementedError('attempted instantiating ' + str(type(self)))
31
31
32 def tryread(self, path):
32 def tryread(self, path):
33 '''gracefully return an empty string for missing files'''
33 '''gracefully return an empty string for missing files'''
34 try:
34 try:
35 return self.read(path)
35 return self.read(path)
36 except IOError as inst:
36 except IOError as inst:
37 if inst.errno != errno.ENOENT:
37 if inst.errno != errno.ENOENT:
38 raise
38 raise
39 return ""
39 return ""
40
40
41 def tryreadlines(self, path, mode='rb'):
41 def tryreadlines(self, path, mode='rb'):
42 '''gracefully return an empty array for missing files'''
42 '''gracefully return an empty array for missing files'''
43 try:
43 try:
44 return self.readlines(path, mode=mode)
44 return self.readlines(path, mode=mode)
45 except IOError as inst:
45 except IOError as inst:
46 if inst.errno != errno.ENOENT:
46 if inst.errno != errno.ENOENT:
47 raise
47 raise
48 return []
48 return []
49
49
50 @util.propertycache
50 @util.propertycache
51 def open(self):
51 def open(self):
52 '''Open ``path`` file, which is relative to vfs root.
52 '''Open ``path`` file, which is relative to vfs root.
53
53
54 Newly created directories are marked as "not to be indexed by
54 Newly created directories are marked as "not to be indexed by
55 the content indexing service", if ``notindexed`` is specified
55 the content indexing service", if ``notindexed`` is specified
56 for "write" mode access.
56 for "write" mode access.
57 '''
57 '''
58 return self.__call__
58 return self.__call__
59
59
60 def read(self, path):
60 def read(self, path):
61 with self(path, 'rb') as fp:
61 with self(path, 'rb') as fp:
62 return fp.read()
62 return fp.read()
63
63
64 def readlines(self, path, mode='rb'):
64 def readlines(self, path, mode='rb'):
65 with self(path, mode=mode) as fp:
65 with self(path, mode=mode) as fp:
66 return fp.readlines()
66 return fp.readlines()
67
67
68 def write(self, path, data, backgroundclose=False):
68 def write(self, path, data, backgroundclose=False):
69 with self(path, 'wb', backgroundclose=backgroundclose) as fp:
69 with self(path, 'wb', backgroundclose=backgroundclose) as fp:
70 return fp.write(data)
70 return fp.write(data)
71
71
72 def writelines(self, path, data, mode='wb', notindexed=False):
72 def writelines(self, path, data, mode='wb', notindexed=False):
73 with self(path, mode=mode, notindexed=notindexed) as fp:
73 with self(path, mode=mode, notindexed=notindexed) as fp:
74 return fp.writelines(data)
74 return fp.writelines(data)
75
75
76 def append(self, path, data):
76 def append(self, path, data):
77 with self(path, 'ab') as fp:
77 with self(path, 'ab') as fp:
78 return fp.write(data)
78 return fp.write(data)
79
79
80 def basename(self, path):
80 def basename(self, path):
81 """return base element of a path (as os.path.basename would do)
81 """return base element of a path (as os.path.basename would do)
82
82
83 This exists to allow handling of strange encoding if needed."""
83 This exists to allow handling of strange encoding if needed."""
84 return os.path.basename(path)
84 return os.path.basename(path)
85
85
86 def chmod(self, path, mode):
86 def chmod(self, path, mode):
87 return os.chmod(self.join(path), mode)
87 return os.chmod(self.join(path), mode)
88
88
89 def dirname(self, path):
89 def dirname(self, path):
90 """return dirname element of a path (as os.path.dirname would do)
90 """return dirname element of a path (as os.path.dirname would do)
91
91
92 This exists to allow handling of strange encoding if needed."""
92 This exists to allow handling of strange encoding if needed."""
93 return os.path.dirname(path)
93 return os.path.dirname(path)
94
94
95 def exists(self, path=None):
95 def exists(self, path=None):
96 return os.path.exists(self.join(path))
96 return os.path.exists(self.join(path))
97
97
98 def fstat(self, fp):
98 def fstat(self, fp):
99 return util.fstat(fp)
99 return util.fstat(fp)
100
100
101 def isdir(self, path=None):
101 def isdir(self, path=None):
102 return os.path.isdir(self.join(path))
102 return os.path.isdir(self.join(path))
103
103
104 def isfile(self, path=None):
104 def isfile(self, path=None):
105 return os.path.isfile(self.join(path))
105 return os.path.isfile(self.join(path))
106
106
107 def islink(self, path=None):
107 def islink(self, path=None):
108 return os.path.islink(self.join(path))
108 return os.path.islink(self.join(path))
109
109
110 def isfileorlink(self, path=None):
110 def isfileorlink(self, path=None):
111 '''return whether path is a regular file or a symlink
111 '''return whether path is a regular file or a symlink
112
112
113 Unlike isfile, this doesn't follow symlinks.'''
113 Unlike isfile, this doesn't follow symlinks.'''
114 try:
114 try:
115 st = self.lstat(path)
115 st = self.lstat(path)
116 except OSError:
116 except OSError:
117 return False
117 return False
118 mode = st.st_mode
118 mode = st.st_mode
119 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
119 return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
120
120
121 def reljoin(self, *paths):
121 def reljoin(self, *paths):
122 """join various elements of a path together (as os.path.join would do)
122 """join various elements of a path together (as os.path.join would do)
123
123
124 The vfs base is not injected so that path stay relative. This exists
124 The vfs base is not injected so that path stay relative. This exists
125 to allow handling of strange encoding if needed."""
125 to allow handling of strange encoding if needed."""
126 return os.path.join(*paths)
126 return os.path.join(*paths)
127
127
128 def split(self, path):
128 def split(self, path):
129 """split top-most element of a path (as os.path.split would do)
129 """split top-most element of a path (as os.path.split would do)
130
130
131 This exists to allow handling of strange encoding if needed."""
131 This exists to allow handling of strange encoding if needed."""
132 return os.path.split(path)
132 return os.path.split(path)
133
133
134 def lexists(self, path=None):
134 def lexists(self, path=None):
135 return os.path.lexists(self.join(path))
135 return os.path.lexists(self.join(path))
136
136
137 def lstat(self, path=None):
137 def lstat(self, path=None):
138 return os.lstat(self.join(path))
138 return os.lstat(self.join(path))
139
139
140 def listdir(self, path=None):
140 def listdir(self, path=None):
141 return os.listdir(self.join(path))
141 return os.listdir(self.join(path))
142
142
143 def makedir(self, path=None, notindexed=True):
143 def makedir(self, path=None, notindexed=True):
144 return util.makedir(self.join(path), notindexed)
144 return util.makedir(self.join(path), notindexed)
145
145
146 def makedirs(self, path=None, mode=None):
146 def makedirs(self, path=None, mode=None):
147 return util.makedirs(self.join(path), mode)
147 return util.makedirs(self.join(path), mode)
148
148
149 def makelock(self, info, path):
149 def makelock(self, info, path):
150 return util.makelock(info, self.join(path))
150 return util.makelock(info, self.join(path))
151
151
152 def mkdir(self, path=None):
152 def mkdir(self, path=None):
153 return os.mkdir(self.join(path))
153 return os.mkdir(self.join(path))
154
154
155 def mkstemp(self, suffix='', prefix='tmp', dir=None, text=False):
155 def mkstemp(self, suffix='', prefix='tmp', dir=None, text=False):
156 fd, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
156 fd, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
157 dir=self.join(dir), text=text)
157 dir=self.join(dir), text=text)
158 dname, fname = util.split(name)
158 dname, fname = util.split(name)
159 if dir:
159 if dir:
160 return fd, os.path.join(dir, fname)
160 return fd, os.path.join(dir, fname)
161 else:
161 else:
162 return fd, fname
162 return fd, fname
163
163
164 def readdir(self, path=None, stat=None, skip=None):
164 def readdir(self, path=None, stat=None, skip=None):
165 return util.listdir(self.join(path), stat, skip)
165 return util.listdir(self.join(path), stat, skip)
166
166
167 def readlock(self, path):
167 def readlock(self, path):
168 return util.readlock(self.join(path))
168 return util.readlock(self.join(path))
169
169
170 def rename(self, src, dst, checkambig=False):
170 def rename(self, src, dst, checkambig=False):
171 """Rename from src to dst
171 """Rename from src to dst
172
172
173 checkambig argument is used with util.filestat, and is useful
173 checkambig argument is used with util.filestat, and is useful
174 only if destination file is guarded by any lock
174 only if destination file is guarded by any lock
175 (e.g. repo.lock or repo.wlock).
175 (e.g. repo.lock or repo.wlock).
176 """
176 """
177 srcpath = self.join(src)
177 srcpath = self.join(src)
178 dstpath = self.join(dst)
178 dstpath = self.join(dst)
179 oldstat = checkambig and util.filestat.frompath(dstpath)
179 oldstat = checkambig and util.filestat.frompath(dstpath)
180 if oldstat and oldstat.stat:
180 if oldstat and oldstat.stat:
181 def dorename(spath, dpath):
181 def dorename(spath, dpath):
182 ret = util.rename(spath, dpath)
182 ret = util.rename(spath, dpath)
183 newstat = util.filestat.frompath(dpath)
183 newstat = util.filestat.frompath(dpath)
184 if newstat.isambig(oldstat):
184 if newstat.isambig(oldstat):
185 # stat of renamed file is ambiguous to original one
185 # stat of renamed file is ambiguous to original one
186 return ret, newstat.avoidambig(dpath, oldstat)
186 return ret, newstat.avoidambig(dpath, oldstat)
187 return ret, True
187 return ret, True
188 ret, avoided = dorename(srcpath, dstpath)
188 ret, avoided = dorename(srcpath, dstpath)
189 if not avoided:
189 if not avoided:
190 # simply copy to change owner of srcpath (see issue5418)
190 # simply copy to change owner of srcpath (see issue5418)
191 util.copyfile(dstpath, srcpath)
191 util.copyfile(dstpath, srcpath)
192 ret, avoided = dorename(srcpath, dstpath)
192 ret, avoided = dorename(srcpath, dstpath)
193 return ret
193 return ret
194 return util.rename(srcpath, dstpath)
194 return util.rename(srcpath, dstpath)
195
195
196 def readlink(self, path):
196 def readlink(self, path):
197 return os.readlink(self.join(path))
197 return os.readlink(self.join(path))
198
198
199 def removedirs(self, path=None):
199 def removedirs(self, path=None):
200 """Remove a leaf directory and all empty intermediate ones
200 """Remove a leaf directory and all empty intermediate ones
201 """
201 """
202 return util.removedirs(self.join(path))
202 return util.removedirs(self.join(path))
203
203
204 def rmtree(self, path=None, ignore_errors=False, forcibly=False):
204 def rmtree(self, path=None, ignore_errors=False, forcibly=False):
205 """Remove a directory tree recursively
205 """Remove a directory tree recursively
206
206
207 If ``forcibly``, this tries to remove READ-ONLY files, too.
207 If ``forcibly``, this tries to remove READ-ONLY files, too.
208 """
208 """
209 if forcibly:
209 if forcibly:
210 def onerror(function, path, excinfo):
210 def onerror(function, path, excinfo):
211 if function is not os.remove:
211 if function is not os.remove:
212 raise
212 raise
213 # read-only files cannot be unlinked under Windows
213 # read-only files cannot be unlinked under Windows
214 s = os.stat(path)
214 s = os.stat(path)
215 if (s.st_mode & stat.S_IWRITE) != 0:
215 if (s.st_mode & stat.S_IWRITE) != 0:
216 raise
216 raise
217 os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
217 os.chmod(path, stat.S_IMODE(s.st_mode) | stat.S_IWRITE)
218 os.remove(path)
218 os.remove(path)
219 else:
219 else:
220 onerror = None
220 onerror = None
221 return shutil.rmtree(self.join(path),
221 return shutil.rmtree(self.join(path),
222 ignore_errors=ignore_errors, onerror=onerror)
222 ignore_errors=ignore_errors, onerror=onerror)
223
223
224 def setflags(self, path, l, x):
224 def setflags(self, path, l, x):
225 return util.setflags(self.join(path), l, x)
225 return util.setflags(self.join(path), l, x)
226
226
227 def stat(self, path=None):
227 def stat(self, path=None):
228 return os.stat(self.join(path))
228 return os.stat(self.join(path))
229
229
230 def unlink(self, path=None):
230 def unlink(self, path=None):
231 return util.unlink(self.join(path))
231 return util.unlink(self.join(path))
232
232
233 def tryunlink(self, path=None):
233 def tryunlink(self, path=None):
234 """Attempt to remove a file, ignoring missing file errors."""
234 """Attempt to remove a file, ignoring missing file errors."""
235 util.tryunlink(self.join(path))
235 util.tryunlink(self.join(path))
236
236
237 def unlinkpath(self, path=None, ignoremissing=False):
237 def unlinkpath(self, path=None, ignoremissing=False):
238 return util.unlinkpath(self.join(path), ignoremissing=ignoremissing)
238 return util.unlinkpath(self.join(path), ignoremissing=ignoremissing)
239
239
240 def utime(self, path=None, t=None):
240 def utime(self, path=None, t=None):
241 return os.utime(self.join(path), t)
241 return os.utime(self.join(path), t)
242
242
243 def walk(self, path=None, onerror=None):
243 def walk(self, path=None, onerror=None):
244 """Yield (dirpath, dirs, files) tuple for each directories under path
244 """Yield (dirpath, dirs, files) tuple for each directories under path
245
245
246 ``dirpath`` is relative one from the root of this vfs. This
246 ``dirpath`` is relative one from the root of this vfs. This
247 uses ``os.sep`` as path separator, even you specify POSIX
247 uses ``os.sep`` as path separator, even you specify POSIX
248 style ``path``.
248 style ``path``.
249
249
250 "The root of this vfs" is represented as empty ``dirpath``.
250 "The root of this vfs" is represented as empty ``dirpath``.
251 """
251 """
252 root = os.path.normpath(self.join(None))
252 root = os.path.normpath(self.join(None))
253 # when dirpath == root, dirpath[prefixlen:] becomes empty
253 # when dirpath == root, dirpath[prefixlen:] becomes empty
254 # because len(dirpath) < prefixlen.
254 # because len(dirpath) < prefixlen.
255 prefixlen = len(pathutil.normasprefix(root))
255 prefixlen = len(pathutil.normasprefix(root))
256 for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror):
256 for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror):
257 yield (dirpath[prefixlen:], dirs, files)
257 yield (dirpath[prefixlen:], dirs, files)
258
258
259 @contextlib.contextmanager
259 @contextlib.contextmanager
260 def backgroundclosing(self, ui, expectedcount=-1):
260 def backgroundclosing(self, ui, expectedcount=-1):
261 """Allow files to be closed asynchronously.
261 """Allow files to be closed asynchronously.
262
262
263 When this context manager is active, ``backgroundclose`` can be passed
263 When this context manager is active, ``backgroundclose`` can be passed
264 to ``__call__``/``open`` to result in the file possibly being closed
264 to ``__call__``/``open`` to result in the file possibly being closed
265 asynchronously, on a background thread.
265 asynchronously, on a background thread.
266 """
266 """
267 # This is an arbitrary restriction and could be changed if we ever
267 # This is an arbitrary restriction and could be changed if we ever
268 # have a use case.
268 # have a use case.
269 vfs = getattr(self, 'vfs', self)
269 vfs = getattr(self, 'vfs', self)
270 if getattr(vfs, '_backgroundfilecloser', None):
270 if getattr(vfs, '_backgroundfilecloser', None):
271 raise error.Abort(
271 raise error.Abort(
272 _('can only have 1 active background file closer'))
272 _('can only have 1 active background file closer'))
273
273
274 with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc:
274 with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc:
275 try:
275 try:
276 vfs._backgroundfilecloser = bfc
276 vfs._backgroundfilecloser = bfc
277 yield bfc
277 yield bfc
278 finally:
278 finally:
279 vfs._backgroundfilecloser = None
279 vfs._backgroundfilecloser = None
280
280
281 class vfs(abstractvfs):
281 class vfs(abstractvfs):
282 '''Operate files relative to a base directory
282 '''Operate files relative to a base directory
283
283
284 This class is used to hide the details of COW semantics and
284 This class is used to hide the details of COW semantics and
285 remote file access from higher level code.
285 remote file access from higher level code.
286 '''
286 '''
287 def __init__(self, base, audit=True, expandpath=False, realpath=False):
287 def __init__(self, base, audit=True, expandpath=False, realpath=False):
288 if expandpath:
288 if expandpath:
289 base = util.expandpath(base)
289 base = util.expandpath(base)
290 if realpath:
290 if realpath:
291 base = os.path.realpath(base)
291 base = os.path.realpath(base)
292 self.base = base
292 self.base = base
293 self.mustaudit = audit
293 self.mustaudit = audit
294 self.createmode = None
294 self.createmode = None
295 self._trustnlink = None
295 self._trustnlink = None
296
296
297 @property
297 @property
298 def mustaudit(self):
298 def mustaudit(self):
299 return self._audit
299 return self._audit
300
300
301 @mustaudit.setter
301 @mustaudit.setter
302 def mustaudit(self, onoff):
302 def mustaudit(self, onoff):
303 self._audit = onoff
303 self._audit = onoff
304 if onoff:
304 if onoff:
305 self.audit = pathutil.pathauditor(self.base)
305 self.audit = pathutil.pathauditor(self.base)
306 else:
306 else:
307 self.audit = util.always
307 self.audit = util.always
308
308
309 @util.propertycache
309 @util.propertycache
310 def _cansymlink(self):
310 def _cansymlink(self):
311 return util.checklink(self.base)
311 return util.checklink(self.base)
312
312
313 @util.propertycache
313 @util.propertycache
314 def _chmod(self):
314 def _chmod(self):
315 return util.checkexec(self.base)
315 return util.checkexec(self.base)
316
316
317 def _fixfilemode(self, name):
317 def _fixfilemode(self, name):
318 if self.createmode is None or not self._chmod:
318 if self.createmode is None or not self._chmod:
319 return
319 return
320 os.chmod(name, self.createmode & 0o666)
320 os.chmod(name, self.createmode & 0o666)
321
321
322 def __call__(self, path, mode="r", text=False, atomictemp=False,
322 def __call__(self, path, mode="r", text=False, atomictemp=False,
323 notindexed=False, backgroundclose=False, checkambig=False):
323 notindexed=False, backgroundclose=False, checkambig=False,
324 auditpath=True):
324 '''Open ``path`` file, which is relative to vfs root.
325 '''Open ``path`` file, which is relative to vfs root.
325
326
326 Newly created directories are marked as "not to be indexed by
327 Newly created directories are marked as "not to be indexed by
327 the content indexing service", if ``notindexed`` is specified
328 the content indexing service", if ``notindexed`` is specified
328 for "write" mode access.
329 for "write" mode access.
329
330
330 If ``backgroundclose`` is passed, the file may be closed asynchronously.
331 If ``backgroundclose`` is passed, the file may be closed asynchronously.
331 It can only be used if the ``self.backgroundclosing()`` context manager
332 It can only be used if the ``self.backgroundclosing()`` context manager
332 is active. This should only be specified if the following criteria hold:
333 is active. This should only be specified if the following criteria hold:
333
334
334 1. There is a potential for writing thousands of files. Unless you
335 1. There is a potential for writing thousands of files. Unless you
335 are writing thousands of files, the performance benefits of
336 are writing thousands of files, the performance benefits of
336 asynchronously closing files is not realized.
337 asynchronously closing files is not realized.
337 2. Files are opened exactly once for the ``backgroundclosing``
338 2. Files are opened exactly once for the ``backgroundclosing``
338 active duration and are therefore free of race conditions between
339 active duration and are therefore free of race conditions between
339 closing a file on a background thread and reopening it. (If the
340 closing a file on a background thread and reopening it. (If the
340 file were opened multiple times, there could be unflushed data
341 file were opened multiple times, there could be unflushed data
341 because the original file handle hasn't been flushed/closed yet.)
342 because the original file handle hasn't been flushed/closed yet.)
342
343
343 ``checkambig`` argument is passed to atomictemplfile (valid
344 ``checkambig`` argument is passed to atomictemplfile (valid
344 only for writing), and is useful only if target file is
345 only for writing), and is useful only if target file is
345 guarded by any lock (e.g. repo.lock or repo.wlock).
346 guarded by any lock (e.g. repo.lock or repo.wlock).
346 '''
347 '''
348 if auditpath:
347 if self._audit:
349 if self._audit:
348 r = util.checkosfilename(path)
350 r = util.checkosfilename(path)
349 if r:
351 if r:
350 raise error.Abort("%s: %r" % (r, path))
352 raise error.Abort("%s: %r" % (r, path))
351 self.audit(path)
353 self.audit(path)
352 f = self.join(path)
354 f = self.join(path)
353
355
354 if not text and "b" not in mode:
356 if not text and "b" not in mode:
355 mode += "b" # for that other OS
357 mode += "b" # for that other OS
356
358
357 nlink = -1
359 nlink = -1
358 if mode not in ('r', 'rb'):
360 if mode not in ('r', 'rb'):
359 dirname, basename = util.split(f)
361 dirname, basename = util.split(f)
360 # If basename is empty, then the path is malformed because it points
362 # If basename is empty, then the path is malformed because it points
361 # to a directory. Let the posixfile() call below raise IOError.
363 # to a directory. Let the posixfile() call below raise IOError.
362 if basename:
364 if basename:
363 if atomictemp:
365 if atomictemp:
364 util.makedirs(dirname, self.createmode, notindexed)
366 util.makedirs(dirname, self.createmode, notindexed)
365 return util.atomictempfile(f, mode, self.createmode,
367 return util.atomictempfile(f, mode, self.createmode,
366 checkambig=checkambig)
368 checkambig=checkambig)
367 try:
369 try:
368 if 'w' in mode:
370 if 'w' in mode:
369 util.unlink(f)
371 util.unlink(f)
370 nlink = 0
372 nlink = 0
371 else:
373 else:
372 # nlinks() may behave differently for files on Windows
374 # nlinks() may behave differently for files on Windows
373 # shares if the file is open.
375 # shares if the file is open.
374 with util.posixfile(f):
376 with util.posixfile(f):
375 nlink = util.nlinks(f)
377 nlink = util.nlinks(f)
376 if nlink < 1:
378 if nlink < 1:
377 nlink = 2 # force mktempcopy (issue1922)
379 nlink = 2 # force mktempcopy (issue1922)
378 except (OSError, IOError) as e:
380 except (OSError, IOError) as e:
379 if e.errno != errno.ENOENT:
381 if e.errno != errno.ENOENT:
380 raise
382 raise
381 nlink = 0
383 nlink = 0
382 util.makedirs(dirname, self.createmode, notindexed)
384 util.makedirs(dirname, self.createmode, notindexed)
383 if nlink > 0:
385 if nlink > 0:
384 if self._trustnlink is None:
386 if self._trustnlink is None:
385 self._trustnlink = nlink > 1 or util.checknlink(f)
387 self._trustnlink = nlink > 1 or util.checknlink(f)
386 if nlink > 1 or not self._trustnlink:
388 if nlink > 1 or not self._trustnlink:
387 util.rename(util.mktempcopy(f), f)
389 util.rename(util.mktempcopy(f), f)
388 fp = util.posixfile(f, mode)
390 fp = util.posixfile(f, mode)
389 if nlink == 0:
391 if nlink == 0:
390 self._fixfilemode(f)
392 self._fixfilemode(f)
391
393
392 if checkambig:
394 if checkambig:
393 if mode in ('r', 'rb'):
395 if mode in ('r', 'rb'):
394 raise error.Abort(_('implementation error: mode %s is not'
396 raise error.Abort(_('implementation error: mode %s is not'
395 ' valid for checkambig=True') % mode)
397 ' valid for checkambig=True') % mode)
396 fp = checkambigatclosing(fp)
398 fp = checkambigatclosing(fp)
397
399
398 if backgroundclose:
400 if backgroundclose:
399 if not self._backgroundfilecloser:
401 if not self._backgroundfilecloser:
400 raise error.Abort(_('backgroundclose can only be used when a '
402 raise error.Abort(_('backgroundclose can only be used when a '
401 'backgroundclosing context manager is active')
403 'backgroundclosing context manager is active')
402 )
404 )
403
405
404 fp = delayclosedfile(fp, self._backgroundfilecloser)
406 fp = delayclosedfile(fp, self._backgroundfilecloser)
405
407
406 return fp
408 return fp
407
409
408 def symlink(self, src, dst):
410 def symlink(self, src, dst):
409 self.audit(dst)
411 self.audit(dst)
410 linkname = self.join(dst)
412 linkname = self.join(dst)
411 util.tryunlink(linkname)
413 util.tryunlink(linkname)
412
414
413 util.makedirs(os.path.dirname(linkname), self.createmode)
415 util.makedirs(os.path.dirname(linkname), self.createmode)
414
416
415 if self._cansymlink:
417 if self._cansymlink:
416 try:
418 try:
417 os.symlink(src, linkname)
419 os.symlink(src, linkname)
418 except OSError as err:
420 except OSError as err:
419 raise OSError(err.errno, _('could not symlink to %r: %s') %
421 raise OSError(err.errno, _('could not symlink to %r: %s') %
420 (src, err.strerror), linkname)
422 (src, err.strerror), linkname)
421 else:
423 else:
422 self.write(dst, src)
424 self.write(dst, src)
423
425
424 def join(self, path, *insidef):
426 def join(self, path, *insidef):
425 if path:
427 if path:
426 return os.path.join(self.base, path, *insidef)
428 return os.path.join(self.base, path, *insidef)
427 else:
429 else:
428 return self.base
430 return self.base
429
431
430 opener = vfs
432 opener = vfs
431
433
432 class auditvfs(object):
434 class auditvfs(object):
433 def __init__(self, vfs):
435 def __init__(self, vfs):
434 self.vfs = vfs
436 self.vfs = vfs
435
437
436 @property
438 @property
437 def mustaudit(self):
439 def mustaudit(self):
438 return self.vfs.mustaudit
440 return self.vfs.mustaudit
439
441
440 @mustaudit.setter
442 @mustaudit.setter
441 def mustaudit(self, onoff):
443 def mustaudit(self, onoff):
442 self.vfs.mustaudit = onoff
444 self.vfs.mustaudit = onoff
443
445
444 @property
446 @property
445 def options(self):
447 def options(self):
446 return self.vfs.options
448 return self.vfs.options
447
449
448 @options.setter
450 @options.setter
449 def options(self, value):
451 def options(self, value):
450 self.vfs.options = value
452 self.vfs.options = value
451
453
452 class filtervfs(abstractvfs, auditvfs):
454 class filtervfs(abstractvfs, auditvfs):
453 '''Wrapper vfs for filtering filenames with a function.'''
455 '''Wrapper vfs for filtering filenames with a function.'''
454
456
455 def __init__(self, vfs, filter):
457 def __init__(self, vfs, filter):
456 auditvfs.__init__(self, vfs)
458 auditvfs.__init__(self, vfs)
457 self._filter = filter
459 self._filter = filter
458
460
459 def __call__(self, path, *args, **kwargs):
461 def __call__(self, path, *args, **kwargs):
460 return self.vfs(self._filter(path), *args, **kwargs)
462 return self.vfs(self._filter(path), *args, **kwargs)
461
463
462 def join(self, path, *insidef):
464 def join(self, path, *insidef):
463 if path:
465 if path:
464 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
466 return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
465 else:
467 else:
466 return self.vfs.join(path)
468 return self.vfs.join(path)
467
469
468 filteropener = filtervfs
470 filteropener = filtervfs
469
471
470 class readonlyvfs(abstractvfs, auditvfs):
472 class readonlyvfs(abstractvfs, auditvfs):
471 '''Wrapper vfs preventing any writing.'''
473 '''Wrapper vfs preventing any writing.'''
472
474
473 def __init__(self, vfs):
475 def __init__(self, vfs):
474 auditvfs.__init__(self, vfs)
476 auditvfs.__init__(self, vfs)
475
477
476 def __call__(self, path, mode='r', *args, **kw):
478 def __call__(self, path, mode='r', *args, **kw):
477 if mode not in ('r', 'rb'):
479 if mode not in ('r', 'rb'):
478 raise error.Abort(_('this vfs is read only'))
480 raise error.Abort(_('this vfs is read only'))
479 return self.vfs(path, mode, *args, **kw)
481 return self.vfs(path, mode, *args, **kw)
480
482
481 def join(self, path, *insidef):
483 def join(self, path, *insidef):
482 return self.vfs.join(path, *insidef)
484 return self.vfs.join(path, *insidef)
483
485
484 class closewrapbase(object):
486 class closewrapbase(object):
485 """Base class of wrapper, which hooks closing
487 """Base class of wrapper, which hooks closing
486
488
487 Do not instantiate outside of the vfs layer.
489 Do not instantiate outside of the vfs layer.
488 """
490 """
489 def __init__(self, fh):
491 def __init__(self, fh):
490 object.__setattr__(self, r'_origfh', fh)
492 object.__setattr__(self, r'_origfh', fh)
491
493
492 def __getattr__(self, attr):
494 def __getattr__(self, attr):
493 return getattr(self._origfh, attr)
495 return getattr(self._origfh, attr)
494
496
495 def __setattr__(self, attr, value):
497 def __setattr__(self, attr, value):
496 return setattr(self._origfh, attr, value)
498 return setattr(self._origfh, attr, value)
497
499
498 def __delattr__(self, attr):
500 def __delattr__(self, attr):
499 return delattr(self._origfh, attr)
501 return delattr(self._origfh, attr)
500
502
501 def __enter__(self):
503 def __enter__(self):
502 return self._origfh.__enter__()
504 return self._origfh.__enter__()
503
505
504 def __exit__(self, exc_type, exc_value, exc_tb):
506 def __exit__(self, exc_type, exc_value, exc_tb):
505 raise NotImplementedError('attempted instantiating ' + str(type(self)))
507 raise NotImplementedError('attempted instantiating ' + str(type(self)))
506
508
507 def close(self):
509 def close(self):
508 raise NotImplementedError('attempted instantiating ' + str(type(self)))
510 raise NotImplementedError('attempted instantiating ' + str(type(self)))
509
511
510 class delayclosedfile(closewrapbase):
512 class delayclosedfile(closewrapbase):
511 """Proxy for a file object whose close is delayed.
513 """Proxy for a file object whose close is delayed.
512
514
513 Do not instantiate outside of the vfs layer.
515 Do not instantiate outside of the vfs layer.
514 """
516 """
515 def __init__(self, fh, closer):
517 def __init__(self, fh, closer):
516 super(delayclosedfile, self).__init__(fh)
518 super(delayclosedfile, self).__init__(fh)
517 object.__setattr__(self, r'_closer', closer)
519 object.__setattr__(self, r'_closer', closer)
518
520
519 def __exit__(self, exc_type, exc_value, exc_tb):
521 def __exit__(self, exc_type, exc_value, exc_tb):
520 self._closer.close(self._origfh)
522 self._closer.close(self._origfh)
521
523
522 def close(self):
524 def close(self):
523 self._closer.close(self._origfh)
525 self._closer.close(self._origfh)
524
526
525 class backgroundfilecloser(object):
527 class backgroundfilecloser(object):
526 """Coordinates background closing of file handles on multiple threads."""
528 """Coordinates background closing of file handles on multiple threads."""
527 def __init__(self, ui, expectedcount=-1):
529 def __init__(self, ui, expectedcount=-1):
528 self._running = False
530 self._running = False
529 self._entered = False
531 self._entered = False
530 self._threads = []
532 self._threads = []
531 self._threadexception = None
533 self._threadexception = None
532
534
533 # Only Windows/NTFS has slow file closing. So only enable by default
535 # Only Windows/NTFS has slow file closing. So only enable by default
534 # on that platform. But allow to be enabled elsewhere for testing.
536 # on that platform. But allow to be enabled elsewhere for testing.
535 defaultenabled = pycompat.osname == 'nt'
537 defaultenabled = pycompat.osname == 'nt'
536 enabled = ui.configbool('worker', 'backgroundclose', defaultenabled)
538 enabled = ui.configbool('worker', 'backgroundclose', defaultenabled)
537
539
538 if not enabled:
540 if not enabled:
539 return
541 return
540
542
541 # There is overhead to starting and stopping the background threads.
543 # There is overhead to starting and stopping the background threads.
542 # Don't do background processing unless the file count is large enough
544 # Don't do background processing unless the file count is large enough
543 # to justify it.
545 # to justify it.
544 minfilecount = ui.configint('worker', 'backgroundcloseminfilecount')
546 minfilecount = ui.configint('worker', 'backgroundcloseminfilecount')
545 # FUTURE dynamically start background threads after minfilecount closes.
547 # FUTURE dynamically start background threads after minfilecount closes.
546 # (We don't currently have any callers that don't know their file count)
548 # (We don't currently have any callers that don't know their file count)
547 if expectedcount > 0 and expectedcount < minfilecount:
549 if expectedcount > 0 and expectedcount < minfilecount:
548 return
550 return
549
551
550 maxqueue = ui.configint('worker', 'backgroundclosemaxqueue')
552 maxqueue = ui.configint('worker', 'backgroundclosemaxqueue')
551 threadcount = ui.configint('worker', 'backgroundclosethreadcount')
553 threadcount = ui.configint('worker', 'backgroundclosethreadcount')
552
554
553 ui.debug('starting %d threads for background file closing\n' %
555 ui.debug('starting %d threads for background file closing\n' %
554 threadcount)
556 threadcount)
555
557
556 self._queue = util.queue(maxsize=maxqueue)
558 self._queue = util.queue(maxsize=maxqueue)
557 self._running = True
559 self._running = True
558
560
559 for i in range(threadcount):
561 for i in range(threadcount):
560 t = threading.Thread(target=self._worker, name='backgroundcloser')
562 t = threading.Thread(target=self._worker, name='backgroundcloser')
561 self._threads.append(t)
563 self._threads.append(t)
562 t.start()
564 t.start()
563
565
564 def __enter__(self):
566 def __enter__(self):
565 self._entered = True
567 self._entered = True
566 return self
568 return self
567
569
568 def __exit__(self, exc_type, exc_value, exc_tb):
570 def __exit__(self, exc_type, exc_value, exc_tb):
569 self._running = False
571 self._running = False
570
572
571 # Wait for threads to finish closing so open files don't linger for
573 # Wait for threads to finish closing so open files don't linger for
572 # longer than lifetime of context manager.
574 # longer than lifetime of context manager.
573 for t in self._threads:
575 for t in self._threads:
574 t.join()
576 t.join()
575
577
576 def _worker(self):
578 def _worker(self):
577 """Main routine for worker thread."""
579 """Main routine for worker thread."""
578 while True:
580 while True:
579 try:
581 try:
580 fh = self._queue.get(block=True, timeout=0.100)
582 fh = self._queue.get(block=True, timeout=0.100)
581 # Need to catch or the thread will terminate and
583 # Need to catch or the thread will terminate and
582 # we could orphan file descriptors.
584 # we could orphan file descriptors.
583 try:
585 try:
584 fh.close()
586 fh.close()
585 except Exception as e:
587 except Exception as e:
586 # Stash so can re-raise from main thread later.
588 # Stash so can re-raise from main thread later.
587 self._threadexception = e
589 self._threadexception = e
588 except util.empty:
590 except util.empty:
589 if not self._running:
591 if not self._running:
590 break
592 break
591
593
592 def close(self, fh):
594 def close(self, fh):
593 """Schedule a file for closing."""
595 """Schedule a file for closing."""
594 if not self._entered:
596 if not self._entered:
595 raise error.Abort(_('can only call close() when context manager '
597 raise error.Abort(_('can only call close() when context manager '
596 'active'))
598 'active'))
597
599
598 # If a background thread encountered an exception, raise now so we fail
600 # If a background thread encountered an exception, raise now so we fail
599 # fast. Otherwise we may potentially go on for minutes until the error
601 # fast. Otherwise we may potentially go on for minutes until the error
600 # is acted on.
602 # is acted on.
601 if self._threadexception:
603 if self._threadexception:
602 e = self._threadexception
604 e = self._threadexception
603 self._threadexception = None
605 self._threadexception = None
604 raise e
606 raise e
605
607
606 # If we're not actively running, close synchronously.
608 # If we're not actively running, close synchronously.
607 if not self._running:
609 if not self._running:
608 fh.close()
610 fh.close()
609 return
611 return
610
612
611 self._queue.put(fh, block=True, timeout=None)
613 self._queue.put(fh, block=True, timeout=None)
612
614
613 class checkambigatclosing(closewrapbase):
615 class checkambigatclosing(closewrapbase):
614 """Proxy for a file object, to avoid ambiguity of file stat
616 """Proxy for a file object, to avoid ambiguity of file stat
615
617
616 See also util.filestat for detail about "ambiguity of file stat".
618 See also util.filestat for detail about "ambiguity of file stat".
617
619
618 This proxy is useful only if the target file is guarded by any
620 This proxy is useful only if the target file is guarded by any
619 lock (e.g. repo.lock or repo.wlock)
621 lock (e.g. repo.lock or repo.wlock)
620
622
621 Do not instantiate outside of the vfs layer.
623 Do not instantiate outside of the vfs layer.
622 """
624 """
623 def __init__(self, fh):
625 def __init__(self, fh):
624 super(checkambigatclosing, self).__init__(fh)
626 super(checkambigatclosing, self).__init__(fh)
625 object.__setattr__(self, r'_oldstat', util.filestat.frompath(fh.name))
627 object.__setattr__(self, r'_oldstat', util.filestat.frompath(fh.name))
626
628
627 def _checkambig(self):
629 def _checkambig(self):
628 oldstat = self._oldstat
630 oldstat = self._oldstat
629 if oldstat.stat:
631 if oldstat.stat:
630 newstat = util.filestat.frompath(self._origfh.name)
632 newstat = util.filestat.frompath(self._origfh.name)
631 if newstat.isambig(oldstat):
633 if newstat.isambig(oldstat):
632 # stat of changed file is ambiguous to original one
634 # stat of changed file is ambiguous to original one
633 newstat.avoidambig(self._origfh.name, oldstat)
635 newstat.avoidambig(self._origfh.name, oldstat)
634
636
635 def __exit__(self, exc_type, exc_value, exc_tb):
637 def __exit__(self, exc_type, exc_value, exc_tb):
636 self._origfh.__exit__(exc_type, exc_value, exc_tb)
638 self._origfh.__exit__(exc_type, exc_value, exc_tb)
637 self._checkambig()
639 self._checkambig()
638
640
639 def close(self):
641 def close(self):
640 self._origfh.close()
642 self._origfh.close()
641 self._checkambig()
643 self._checkambig()
General Comments 0
You need to be logged in to leave comments. Login now