##// END OF EJS Templates
streamclone: rework canperformstreamclone...
Boris Feld -
r35775:bbf7abd0 default
parent child Browse files
Show More
@@ -1,542 +1,541 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 phases,
17 17 store,
18 18 util,
19 19 )
20 20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bundle2=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 ``bailifbundle2supported`` will cause the function to return False if
25 bundle2 stream clones are supported. It should only be called by the
26 legacy stream clone code path.
24 ``bundle2`` will cause the function to consider stream clone through
25 bundle2 and only through bundle2.
27 26
28 27 Returns a tuple of (supported, requirements). ``supported`` is True if
29 28 streaming clone is supported and False otherwise. ``requirements`` is
30 29 a set of repo requirements from the remote, or ``None`` if stream clone
31 30 isn't supported.
32 31 """
33 32 repo = pullop.repo
34 33 remote = pullop.remote
35 34
36 35 bundle2supported = False
37 36 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
37 if 'v2' in pullop.remotebundle2caps.get('stream', []):
39 38 bundle2supported = True
40 39 # else
41 40 # Server doesn't support bundle2 stream clone or doesn't support
42 41 # the versions we support. Fall back and possibly allow legacy.
43 42
44 43 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
44 if bundle2supported and not bundle2:
46 45 return False, None
47 46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
49 # return False, None
47 elif bundle2 and not bundle2supported:
48 return False, None
50 49
51 50 # Streaming clone only works on empty repositories.
52 51 if len(repo):
53 52 return False, None
54 53
55 54 # Streaming clone only works if all data is being requested.
56 55 if pullop.heads:
57 56 return False, None
58 57
59 58 streamrequested = pullop.streamclonerequested
60 59
61 60 # If we don't have a preference, let the server decide for us. This
62 61 # likely only comes into play in LANs.
63 62 if streamrequested is None:
64 63 # The server can advertise whether to prefer streaming clone.
65 64 streamrequested = remote.capable('stream-preferred')
66 65
67 66 if not streamrequested:
68 67 return False, None
69 68
70 69 # In order for stream clone to work, the client has to support all the
71 70 # requirements advertised by the server.
72 71 #
73 72 # The server advertises its requirements via the "stream" and "streamreqs"
74 73 # capability. "stream" (a value-less capability) is advertised if and only
75 74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 75 # is advertised and contains a comma-delimited list of requirements.
77 76 requirements = set()
78 77 if remote.capable('stream'):
79 78 requirements.add('revlogv1')
80 79 else:
81 80 streamreqs = remote.capable('streamreqs')
82 81 # This is weird and shouldn't happen with modern servers.
83 82 if not streamreqs:
84 83 pullop.repo.ui.warn(_(
85 84 'warning: stream clone requested but server has them '
86 85 'disabled\n'))
87 86 return False, None
88 87
89 88 streamreqs = set(streamreqs.split(','))
90 89 # Server requires something we don't support. Bail.
91 90 missingreqs = streamreqs - repo.supportedformats
92 91 if missingreqs:
93 92 pullop.repo.ui.warn(_(
94 93 'warning: stream clone requested but client is missing '
95 94 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 95 pullop.repo.ui.warn(
97 96 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 97 'for more information)\n'))
99 98 return False, None
100 99 requirements = streamreqs
101 100
102 101 return True, requirements
103 102
104 103 def maybeperformlegacystreamclone(pullop):
105 104 """Possibly perform a legacy stream clone operation.
106 105
107 106 Legacy stream clones are performed as part of pull but before all other
108 107 operations.
109 108
110 109 A legacy stream clone will not be performed if a bundle2 stream clone is
111 110 supported.
112 111 """
113 112 supported, requirements = canperformstreamclone(pullop)
114 113
115 114 if not supported:
116 115 return
117 116
118 117 repo = pullop.repo
119 118 remote = pullop.remote
120 119
121 120 # Save remote branchmap. We will use it later to speed up branchcache
122 121 # creation.
123 122 rbranchmap = None
124 123 if remote.capable('branchmap'):
125 124 rbranchmap = remote.branchmap()
126 125
127 126 repo.ui.status(_('streaming all changes\n'))
128 127
129 128 fp = remote.stream_out()
130 129 l = fp.readline()
131 130 try:
132 131 resp = int(l)
133 132 except ValueError:
134 133 raise error.ResponseError(
135 134 _('unexpected response from remote server:'), l)
136 135 if resp == 1:
137 136 raise error.Abort(_('operation forbidden by server'))
138 137 elif resp == 2:
139 138 raise error.Abort(_('locking the remote repository failed'))
140 139 elif resp != 0:
141 140 raise error.Abort(_('the server sent an unknown error code'))
142 141
143 142 l = fp.readline()
144 143 try:
145 144 filecount, bytecount = map(int, l.split(' ', 1))
146 145 except (ValueError, TypeError):
147 146 raise error.ResponseError(
148 147 _('unexpected response from remote server:'), l)
149 148
150 149 with repo.lock():
151 150 consumev1(repo, fp, filecount, bytecount)
152 151
153 152 # new requirements = old non-format requirements +
154 153 # new format-related remote requirements
155 154 # requirements from the streamed-in repository
156 155 repo.requirements = requirements | (
157 156 repo.requirements - repo.supportedformats)
158 157 repo._applyopenerreqs()
159 158 repo._writerequirements()
160 159
161 160 if rbranchmap:
162 161 branchmap.replacecache(repo, rbranchmap)
163 162
164 163 repo.invalidate()
165 164
166 165 def allowservergeneration(repo):
167 166 """Whether streaming clones are allowed from the server."""
168 167 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
169 168 return False
170 169
171 170 # The way stream clone works makes it impossible to hide secret changesets.
172 171 # So don't allow this by default.
173 172 secret = phases.hassecret(repo)
174 173 if secret:
175 174 return repo.ui.configbool('server', 'uncompressedallowsecret')
176 175
177 176 return True
178 177
179 178 # This is it's own function so extensions can override it.
180 179 def _walkstreamfiles(repo):
181 180 return repo.store.walk()
182 181
183 182 def generatev1(repo):
184 183 """Emit content for version 1 of a streaming clone.
185 184
186 185 This returns a 3-tuple of (file count, byte size, data iterator).
187 186
188 187 The data iterator consists of N entries for each file being transferred.
189 188 Each file entry starts as a line with the file name and integer size
190 189 delimited by a null byte.
191 190
192 191 The raw file data follows. Following the raw file data is the next file
193 192 entry, or EOF.
194 193
195 194 When used on the wire protocol, an additional line indicating protocol
196 195 success will be prepended to the stream. This function is not responsible
197 196 for adding it.
198 197
199 198 This function will obtain a repository lock to ensure a consistent view of
200 199 the store is captured. It therefore may raise LockError.
201 200 """
202 201 entries = []
203 202 total_bytes = 0
204 203 # Get consistent snapshot of repo, lock during scan.
205 204 with repo.lock():
206 205 repo.ui.debug('scanning\n')
207 206 for name, ename, size in _walkstreamfiles(repo):
208 207 if size:
209 208 entries.append((name, size))
210 209 total_bytes += size
211 210
212 211 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 212 (len(entries), total_bytes))
214 213
215 214 svfs = repo.svfs
216 215 debugflag = repo.ui.debugflag
217 216
218 217 def emitrevlogdata():
219 218 for name, size in entries:
220 219 if debugflag:
221 220 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 221 # partially encode name over the wire for backwards compat
223 222 yield '%s\0%d\n' % (store.encodedir(name), size)
224 223 # auditing at this stage is both pointless (paths are already
225 224 # trusted by the local repo) and expensive
226 225 with svfs(name, 'rb', auditpath=False) as fp:
227 226 if size <= 65536:
228 227 yield fp.read(size)
229 228 else:
230 229 for chunk in util.filechunkiter(fp, limit=size):
231 230 yield chunk
232 231
233 232 return len(entries), total_bytes, emitrevlogdata()
234 233
235 234 def generatev1wireproto(repo):
236 235 """Emit content for version 1 of streaming clone suitable for the wire.
237 236
238 237 This is the data output from ``generatev1()`` with 2 header lines. The
239 238 first line indicates overall success. The 2nd contains the file count and
240 239 byte size of payload.
241 240
242 241 The success line contains "0" for success, "1" for stream generation not
243 242 allowed, and "2" for error locking the repository (possibly indicating
244 243 a permissions error for the server process).
245 244 """
246 245 if not allowservergeneration(repo):
247 246 yield '1\n'
248 247 return
249 248
250 249 try:
251 250 filecount, bytecount, it = generatev1(repo)
252 251 except error.LockError:
253 252 yield '2\n'
254 253 return
255 254
256 255 # Indicates successful response.
257 256 yield '0\n'
258 257 yield '%d %d\n' % (filecount, bytecount)
259 258 for chunk in it:
260 259 yield chunk
261 260
262 261 def generatebundlev1(repo, compression='UN'):
263 262 """Emit content for version 1 of a stream clone bundle.
264 263
265 264 The first 4 bytes of the output ("HGS1") denote this as stream clone
266 265 bundle version 1.
267 266
268 267 The next 2 bytes indicate the compression type. Only "UN" is currently
269 268 supported.
270 269
271 270 The next 16 bytes are two 64-bit big endian unsigned integers indicating
272 271 file count and byte count, respectively.
273 272
274 273 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
275 274 of the requirements string, including a trailing \0. The following N bytes
276 275 are the requirements string, which is ASCII containing a comma-delimited
277 276 list of repo requirements that are needed to support the data.
278 277
279 278 The remaining content is the output of ``generatev1()`` (which may be
280 279 compressed in the future).
281 280
282 281 Returns a tuple of (requirements, data generator).
283 282 """
284 283 if compression != 'UN':
285 284 raise ValueError('we do not support the compression argument yet')
286 285
287 286 requirements = repo.requirements & repo.supportedformats
288 287 requires = ','.join(sorted(requirements))
289 288
290 289 def gen():
291 290 yield 'HGS1'
292 291 yield compression
293 292
294 293 filecount, bytecount, it = generatev1(repo)
295 294 repo.ui.status(_('writing %d bytes for %d files\n') %
296 295 (bytecount, filecount))
297 296
298 297 yield struct.pack('>QQ', filecount, bytecount)
299 298 yield struct.pack('>H', len(requires) + 1)
300 299 yield requires + '\0'
301 300
302 301 # This is where we'll add compression in the future.
303 302 assert compression == 'UN'
304 303
305 304 seen = 0
306 305 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
307 306
308 307 for chunk in it:
309 308 seen += len(chunk)
310 309 repo.ui.progress(_('bundle'), seen, total=bytecount,
311 310 unit=_('bytes'))
312 311 yield chunk
313 312
314 313 repo.ui.progress(_('bundle'), None)
315 314
316 315 return requirements, gen()
317 316
318 317 def consumev1(repo, fp, filecount, bytecount):
319 318 """Apply the contents from version 1 of a streaming clone file handle.
320 319
321 320 This takes the output from "stream_out" and applies it to the specified
322 321 repository.
323 322
324 323 Like "stream_out," the status line added by the wire protocol is not
325 324 handled by this function.
326 325 """
327 326 with repo.lock():
328 327 repo.ui.status(_('%d files to transfer, %s of data\n') %
329 328 (filecount, util.bytecount(bytecount)))
330 329 handled_bytes = 0
331 330 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
332 331 start = util.timer()
333 332
334 333 # TODO: get rid of (potential) inconsistency
335 334 #
336 335 # If transaction is started and any @filecache property is
337 336 # changed at this point, it causes inconsistency between
338 337 # in-memory cached property and streamclone-ed file on the
339 338 # disk. Nested transaction prevents transaction scope "clone"
340 339 # below from writing in-memory changes out at the end of it,
341 340 # even though in-memory changes are discarded at the end of it
342 341 # regardless of transaction nesting.
343 342 #
344 343 # But transaction nesting can't be simply prohibited, because
345 344 # nesting occurs also in ordinary case (e.g. enabling
346 345 # clonebundles).
347 346
348 347 with repo.transaction('clone'):
349 348 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
350 349 for i in xrange(filecount):
351 350 # XXX doesn't support '\n' or '\r' in filenames
352 351 l = fp.readline()
353 352 try:
354 353 name, size = l.split('\0', 1)
355 354 size = int(size)
356 355 except (ValueError, TypeError):
357 356 raise error.ResponseError(
358 357 _('unexpected response from remote server:'), l)
359 358 if repo.ui.debugflag:
360 359 repo.ui.debug('adding %s (%s)\n' %
361 360 (name, util.bytecount(size)))
362 361 # for backwards compat, name was partially encoded
363 362 path = store.decodedir(name)
364 363 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
365 364 for chunk in util.filechunkiter(fp, limit=size):
366 365 handled_bytes += len(chunk)
367 366 repo.ui.progress(_('clone'), handled_bytes,
368 367 total=bytecount, unit=_('bytes'))
369 368 ofp.write(chunk)
370 369
371 370 # force @filecache properties to be reloaded from
372 371 # streamclone-ed file at next access
373 372 repo.invalidate(clearfilecache=True)
374 373
375 374 elapsed = util.timer() - start
376 375 if elapsed <= 0:
377 376 elapsed = 0.001
378 377 repo.ui.progress(_('clone'), None)
379 378 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
380 379 (util.bytecount(bytecount), elapsed,
381 380 util.bytecount(bytecount / elapsed)))
382 381
383 382 def readbundle1header(fp):
384 383 compression = fp.read(2)
385 384 if compression != 'UN':
386 385 raise error.Abort(_('only uncompressed stream clone bundles are '
387 386 'supported; got %s') % compression)
388 387
389 388 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
390 389 requireslen = struct.unpack('>H', fp.read(2))[0]
391 390 requires = fp.read(requireslen)
392 391
393 392 if not requires.endswith('\0'):
394 393 raise error.Abort(_('malformed stream clone bundle: '
395 394 'requirements not properly encoded'))
396 395
397 396 requirements = set(requires.rstrip('\0').split(','))
398 397
399 398 return filecount, bytecount, requirements
400 399
401 400 def applybundlev1(repo, fp):
402 401 """Apply the content from a stream clone bundle version 1.
403 402
404 403 We assume the 4 byte header has been read and validated and the file handle
405 404 is at the 2 byte compression identifier.
406 405 """
407 406 if len(repo):
408 407 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
409 408 'repo'))
410 409
411 410 filecount, bytecount, requirements = readbundle1header(fp)
412 411 missingreqs = requirements - repo.supportedformats
413 412 if missingreqs:
414 413 raise error.Abort(_('unable to apply stream clone: '
415 414 'unsupported format: %s') %
416 415 ', '.join(sorted(missingreqs)))
417 416
418 417 consumev1(repo, fp, filecount, bytecount)
419 418
420 419 class streamcloneapplier(object):
421 420 """Class to manage applying streaming clone bundles.
422 421
423 422 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
424 423 readers to perform bundle type-specific functionality.
425 424 """
426 425 def __init__(self, fh):
427 426 self._fh = fh
428 427
429 428 def apply(self, repo):
430 429 return applybundlev1(repo, self._fh)
431 430
432 431 def _emit(repo, entries, totalfilesize):
433 432 """actually emit the stream bundle"""
434 433 progress = repo.ui.progress
435 434 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
436 435 vfs = repo.svfs
437 436 try:
438 437 seen = 0
439 438 for name, size in entries:
440 439 yield util.uvarintencode(len(name))
441 440 fp = vfs(name)
442 441 try:
443 442 yield util.uvarintencode(size)
444 443 yield name
445 444 if size <= 65536:
446 445 chunks = (fp.read(size),)
447 446 else:
448 447 chunks = util.filechunkiter(fp, limit=size)
449 448 for chunk in chunks:
450 449 seen += len(chunk)
451 450 progress(_('bundle'), seen, total=totalfilesize,
452 451 unit=_('bytes'))
453 452 yield chunk
454 453 finally:
455 454 fp.close()
456 455 finally:
457 456 progress(_('bundle'), None)
458 457
459 458 def generatev2(repo):
460 459 """Emit content for version 2 of a streaming clone.
461 460
462 461 the data stream consists the following entries:
463 462 1) A varint containing the length of the filename
464 463 2) A varint containing the length of file data
465 464 3) N bytes containing the filename (the internal, store-agnostic form)
466 465 4) N bytes containing the file data
467 466
468 467 Returns a 3-tuple of (file count, file size, data iterator).
469 468 """
470 469
471 470 with repo.lock():
472 471
473 472 entries = []
474 473 totalfilesize = 0
475 474
476 475 repo.ui.debug('scanning\n')
477 476 for name, ename, size in _walkstreamfiles(repo):
478 477 if size:
479 478 entries.append((name, size))
480 479 totalfilesize += size
481 480
482 481 chunks = _emit(repo, entries, totalfilesize)
483 482
484 483 return len(entries), totalfilesize, chunks
485 484
486 485 def consumev2(repo, fp, filecount, filesize):
487 486 """Apply the contents from a version 2 streaming clone.
488 487
489 488 Data is read from an object that only needs to provide a ``read(size)``
490 489 method.
491 490 """
492 491 with repo.lock():
493 492 repo.ui.status(_('%d files to transfer, %s of data\n') %
494 493 (filecount, util.bytecount(filesize)))
495 494
496 495 start = util.timer()
497 496 handledbytes = 0
498 497 progress = repo.ui.progress
499 498
500 499 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
501 500
502 501 vfs = repo.svfs
503 502
504 503 with repo.transaction('clone'):
505 504 with vfs.backgroundclosing(repo.ui):
506 505 for i in range(filecount):
507 506 namelen = util.uvarintdecodestream(fp)
508 507 datalen = util.uvarintdecodestream(fp)
509 508
510 509 name = fp.read(namelen)
511 510
512 511 if repo.ui.debugflag:
513 512 repo.ui.debug('adding %s (%s)\n' %
514 513 (name, util.bytecount(datalen)))
515 514
516 515 with vfs(name, 'w') as ofp:
517 516 for chunk in util.filechunkiter(fp, limit=datalen):
518 517 handledbytes += len(chunk)
519 518 progress(_('clone'), handledbytes, total=filesize,
520 519 unit=_('bytes'))
521 520 ofp.write(chunk)
522 521
523 522 # force @filecache properties to be reloaded from
524 523 # streamclone-ed file at next access
525 524 repo.invalidate(clearfilecache=True)
526 525
527 526 elapsed = util.timer() - start
528 527 if elapsed <= 0:
529 528 elapsed = 0.001
530 529 progress(_('clone'), None)
531 530 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
532 531 (util.bytecount(handledbytes), elapsed,
533 532 util.bytecount(handledbytes / elapsed)))
534 533
535 534 def applybundlev2(repo, fp, filecount, filesize, requirements):
536 535 missingreqs = [r for r in requirements if r not in repo.supported]
537 536 if missingreqs:
538 537 raise error.Abort(_('unable to apply stream clone: '
539 538 'unsupported format: %s') %
540 539 ', '.join(sorted(missingreqs)))
541 540
542 541 consumev2(repo, fp, filecount, filesize)
General Comments 0
You need to be logged in to leave comments. Login now