##// END OF EJS Templates
streamclone: stop using 'vfs.mustaudit = False'...
marmoute -
r33256:761ccfef default
parent child Browse files
Show More
@@ -1,418 +1,413 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 phases,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 pullop.repo.ui.warn(_(
85 85 'warning: stream clone requested but server has them '
86 86 'disabled\n'))
87 87 return False, None
88 88
89 89 streamreqs = set(streamreqs.split(','))
90 90 # Server requires something we don't support. Bail.
91 91 missingreqs = streamreqs - repo.supportedformats
92 92 if missingreqs:
93 93 pullop.repo.ui.warn(_(
94 94 'warning: stream clone requested but client is missing '
95 95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 96 pullop.repo.ui.warn(
97 97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 98 'for more information)\n'))
99 99 return False, None
100 100 requirements = streamreqs
101 101
102 102 return True, requirements
103 103
104 104 def maybeperformlegacystreamclone(pullop):
105 105 """Possibly perform a legacy stream clone operation.
106 106
107 107 Legacy stream clones are performed as part of pull but before all other
108 108 operations.
109 109
110 110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 111 supported.
112 112 """
113 113 supported, requirements = canperformstreamclone(pullop)
114 114
115 115 if not supported:
116 116 return
117 117
118 118 repo = pullop.repo
119 119 remote = pullop.remote
120 120
121 121 # Save remote branchmap. We will use it later to speed up branchcache
122 122 # creation.
123 123 rbranchmap = None
124 124 if remote.capable('branchmap'):
125 125 rbranchmap = remote.branchmap()
126 126
127 127 repo.ui.status(_('streaming all changes\n'))
128 128
129 129 fp = remote.stream_out()
130 130 l = fp.readline()
131 131 try:
132 132 resp = int(l)
133 133 except ValueError:
134 134 raise error.ResponseError(
135 135 _('unexpected response from remote server:'), l)
136 136 if resp == 1:
137 137 raise error.Abort(_('operation forbidden by server'))
138 138 elif resp == 2:
139 139 raise error.Abort(_('locking the remote repository failed'))
140 140 elif resp != 0:
141 141 raise error.Abort(_('the server sent an unknown error code'))
142 142
143 143 l = fp.readline()
144 144 try:
145 145 filecount, bytecount = map(int, l.split(' ', 1))
146 146 except (ValueError, TypeError):
147 147 raise error.ResponseError(
148 148 _('unexpected response from remote server:'), l)
149 149
150 150 with repo.lock():
151 151 consumev1(repo, fp, filecount, bytecount)
152 152
153 153 # new requirements = old non-format requirements +
154 154 # new format-related remote requirements
155 155 # requirements from the streamed-in repository
156 156 repo.requirements = requirements | (
157 157 repo.requirements - repo.supportedformats)
158 158 repo._applyopenerreqs()
159 159 repo._writerequirements()
160 160
161 161 if rbranchmap:
162 162 branchmap.replacecache(repo, rbranchmap)
163 163
164 164 repo.invalidate()
165 165
166 166 def allowservergeneration(repo):
167 167 """Whether streaming clones are allowed from the server."""
168 168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
169 169 return False
170 170
171 171 # The way stream clone works makes it impossible to hide secret changesets.
172 172 # So don't allow this by default.
173 173 secret = phases.hassecret(repo)
174 174 if secret:
175 175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176 176
177 177 return True
178 178
179 179 # This is it's own function so extensions can override it.
180 180 def _walkstreamfiles(repo):
181 181 return repo.store.walk()
182 182
183 183 def generatev1(repo):
184 184 """Emit content for version 1 of a streaming clone.
185 185
186 186 This returns a 3-tuple of (file count, byte size, data iterator).
187 187
188 188 The data iterator consists of N entries for each file being transferred.
189 189 Each file entry starts as a line with the file name and integer size
190 190 delimited by a null byte.
191 191
192 192 The raw file data follows. Following the raw file data is the next file
193 193 entry, or EOF.
194 194
195 195 When used on the wire protocol, an additional line indicating protocol
196 196 success will be prepended to the stream. This function is not responsible
197 197 for adding it.
198 198
199 199 This function will obtain a repository lock to ensure a consistent view of
200 200 the store is captured. It therefore may raise LockError.
201 201 """
202 202 entries = []
203 203 total_bytes = 0
204 204 # Get consistent snapshot of repo, lock during scan.
205 205 with repo.lock():
206 206 repo.ui.debug('scanning\n')
207 207 for name, ename, size in _walkstreamfiles(repo):
208 208 if size:
209 209 entries.append((name, size))
210 210 total_bytes += size
211 211
212 212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 213 (len(entries), total_bytes))
214 214
215 215 svfs = repo.svfs
216 oldaudit = svfs.mustaudit
217 216 debugflag = repo.ui.debugflag
218 svfs.mustaudit = False
219 217
220 218 def emitrevlogdata():
221 try:
222 for name, size in entries:
223 if debugflag:
224 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
225 # partially encode name over the wire for backwards compat
226 yield '%s\0%d\n' % (store.encodedir(name), size)
227 if size <= 65536:
228 with svfs(name, 'rb', auditpath=False) as fp:
229 yield fp.read(size)
230 else:
231 data = svfs(name, auditpath=False)
232 for chunk in util.filechunkiter(data, limit=size):
233 yield chunk
234 finally:
235 svfs.mustaudit = oldaudit
219 for name, size in entries:
220 if debugflag:
221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 # partially encode name over the wire for backwards compat
223 yield '%s\0%d\n' % (store.encodedir(name), size)
224 if size <= 65536:
225 with svfs(name, 'rb', auditpath=False) as fp:
226 yield fp.read(size)
227 else:
228 data = svfs(name, auditpath=False)
229 for chunk in util.filechunkiter(data, limit=size):
230 yield chunk
236 231
237 232 return len(entries), total_bytes, emitrevlogdata()
238 233
239 234 def generatev1wireproto(repo):
240 235 """Emit content for version 1 of streaming clone suitable for the wire.
241 236
242 237 This is the data output from ``generatev1()`` with a header line
243 238 indicating file count and byte size.
244 239 """
245 240 filecount, bytecount, it = generatev1(repo)
246 241 yield '%d %d\n' % (filecount, bytecount)
247 242 for chunk in it:
248 243 yield chunk
249 244
250 245 def generatebundlev1(repo, compression='UN'):
251 246 """Emit content for version 1 of a stream clone bundle.
252 247
253 248 The first 4 bytes of the output ("HGS1") denote this as stream clone
254 249 bundle version 1.
255 250
256 251 The next 2 bytes indicate the compression type. Only "UN" is currently
257 252 supported.
258 253
259 254 The next 16 bytes are two 64-bit big endian unsigned integers indicating
260 255 file count and byte count, respectively.
261 256
262 257 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
263 258 of the requirements string, including a trailing \0. The following N bytes
264 259 are the requirements string, which is ASCII containing a comma-delimited
265 260 list of repo requirements that are needed to support the data.
266 261
267 262 The remaining content is the output of ``generatev1()`` (which may be
268 263 compressed in the future).
269 264
270 265 Returns a tuple of (requirements, data generator).
271 266 """
272 267 if compression != 'UN':
273 268 raise ValueError('we do not support the compression argument yet')
274 269
275 270 requirements = repo.requirements & repo.supportedformats
276 271 requires = ','.join(sorted(requirements))
277 272
278 273 def gen():
279 274 yield 'HGS1'
280 275 yield compression
281 276
282 277 filecount, bytecount, it = generatev1(repo)
283 278 repo.ui.status(_('writing %d bytes for %d files\n') %
284 279 (bytecount, filecount))
285 280
286 281 yield struct.pack('>QQ', filecount, bytecount)
287 282 yield struct.pack('>H', len(requires) + 1)
288 283 yield requires + '\0'
289 284
290 285 # This is where we'll add compression in the future.
291 286 assert compression == 'UN'
292 287
293 288 seen = 0
294 289 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
295 290
296 291 for chunk in it:
297 292 seen += len(chunk)
298 293 repo.ui.progress(_('bundle'), seen, total=bytecount,
299 294 unit=_('bytes'))
300 295 yield chunk
301 296
302 297 repo.ui.progress(_('bundle'), None)
303 298
304 299 return requirements, gen()
305 300
306 301 def consumev1(repo, fp, filecount, bytecount):
307 302 """Apply the contents from version 1 of a streaming clone file handle.
308 303
309 304 This takes the output from "stream_out" and applies it to the specified
310 305 repository.
311 306
312 307 Like "stream_out," the status line added by the wire protocol is not
313 308 handled by this function.
314 309 """
315 310 with repo.lock():
316 311 repo.ui.status(_('%d files to transfer, %s of data\n') %
317 312 (filecount, util.bytecount(bytecount)))
318 313 handled_bytes = 0
319 314 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
320 315 start = util.timer()
321 316
322 317 # TODO: get rid of (potential) inconsistency
323 318 #
324 319 # If transaction is started and any @filecache property is
325 320 # changed at this point, it causes inconsistency between
326 321 # in-memory cached property and streamclone-ed file on the
327 322 # disk. Nested transaction prevents transaction scope "clone"
328 323 # below from writing in-memory changes out at the end of it,
329 324 # even though in-memory changes are discarded at the end of it
330 325 # regardless of transaction nesting.
331 326 #
332 327 # But transaction nesting can't be simply prohibited, because
333 328 # nesting occurs also in ordinary case (e.g. enabling
334 329 # clonebundles).
335 330
336 331 with repo.transaction('clone'):
337 332 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
338 333 for i in xrange(filecount):
339 334 # XXX doesn't support '\n' or '\r' in filenames
340 335 l = fp.readline()
341 336 try:
342 337 name, size = l.split('\0', 1)
343 338 size = int(size)
344 339 except (ValueError, TypeError):
345 340 raise error.ResponseError(
346 341 _('unexpected response from remote server:'), l)
347 342 if repo.ui.debugflag:
348 343 repo.ui.debug('adding %s (%s)\n' %
349 344 (name, util.bytecount(size)))
350 345 # for backwards compat, name was partially encoded
351 346 path = store.decodedir(name)
352 347 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
353 348 for chunk in util.filechunkiter(fp, limit=size):
354 349 handled_bytes += len(chunk)
355 350 repo.ui.progress(_('clone'), handled_bytes,
356 351 total=bytecount, unit=_('bytes'))
357 352 ofp.write(chunk)
358 353
359 354 # force @filecache properties to be reloaded from
360 355 # streamclone-ed file at next access
361 356 repo.invalidate(clearfilecache=True)
362 357
363 358 elapsed = util.timer() - start
364 359 if elapsed <= 0:
365 360 elapsed = 0.001
366 361 repo.ui.progress(_('clone'), None)
367 362 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
368 363 (util.bytecount(bytecount), elapsed,
369 364 util.bytecount(bytecount / elapsed)))
370 365
371 366 def readbundle1header(fp):
372 367 compression = fp.read(2)
373 368 if compression != 'UN':
374 369 raise error.Abort(_('only uncompressed stream clone bundles are '
375 370 'supported; got %s') % compression)
376 371
377 372 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
378 373 requireslen = struct.unpack('>H', fp.read(2))[0]
379 374 requires = fp.read(requireslen)
380 375
381 376 if not requires.endswith('\0'):
382 377 raise error.Abort(_('malformed stream clone bundle: '
383 378 'requirements not properly encoded'))
384 379
385 380 requirements = set(requires.rstrip('\0').split(','))
386 381
387 382 return filecount, bytecount, requirements
388 383
389 384 def applybundlev1(repo, fp):
390 385 """Apply the content from a stream clone bundle version 1.
391 386
392 387 We assume the 4 byte header has been read and validated and the file handle
393 388 is at the 2 byte compression identifier.
394 389 """
395 390 if len(repo):
396 391 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
397 392 'repo'))
398 393
399 394 filecount, bytecount, requirements = readbundle1header(fp)
400 395 missingreqs = requirements - repo.supportedformats
401 396 if missingreqs:
402 397 raise error.Abort(_('unable to apply stream clone: '
403 398 'unsupported format: %s') %
404 399 ', '.join(sorted(missingreqs)))
405 400
406 401 consumev1(repo, fp, filecount, bytecount)
407 402
408 403 class streamcloneapplier(object):
409 404 """Class to manage applying streaming clone bundles.
410 405
411 406 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
412 407 readers to perform bundle type-specific functionality.
413 408 """
414 409 def __init__(self, fh):
415 410 self._fh = fh
416 411
417 412 def apply(self, repo):
418 413 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now