##// END OF EJS Templates
streamclone: define first iteration of version 2 of stream format...
Boris Feld -
r35774:cfdccd56 default
parent child Browse files
Show More
@@ -428,3 +428,115 b' class streamcloneapplier(object):'
428
428
429 def apply(self, repo):
429 def apply(self, repo):
430 return applybundlev1(repo, self._fh)
430 return applybundlev1(repo, self._fh)
431
432 def _emit(repo, entries, totalfilesize):
433 """actually emit the stream bundle"""
434 progress = repo.ui.progress
435 progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
436 vfs = repo.svfs
437 try:
438 seen = 0
439 for name, size in entries:
440 yield util.uvarintencode(len(name))
441 fp = vfs(name)
442 try:
443 yield util.uvarintencode(size)
444 yield name
445 if size <= 65536:
446 chunks = (fp.read(size),)
447 else:
448 chunks = util.filechunkiter(fp, limit=size)
449 for chunk in chunks:
450 seen += len(chunk)
451 progress(_('bundle'), seen, total=totalfilesize,
452 unit=_('bytes'))
453 yield chunk
454 finally:
455 fp.close()
456 finally:
457 progress(_('bundle'), None)
458
459 def generatev2(repo):
460 """Emit content for version 2 of a streaming clone.
461
462 the data stream consists the following entries:
463 1) A varint containing the length of the filename
464 2) A varint containing the length of file data
465 3) N bytes containing the filename (the internal, store-agnostic form)
466 4) N bytes containing the file data
467
468 Returns a 3-tuple of (file count, file size, data iterator).
469 """
470
471 with repo.lock():
472
473 entries = []
474 totalfilesize = 0
475
476 repo.ui.debug('scanning\n')
477 for name, ename, size in _walkstreamfiles(repo):
478 if size:
479 entries.append((name, size))
480 totalfilesize += size
481
482 chunks = _emit(repo, entries, totalfilesize)
483
484 return len(entries), totalfilesize, chunks
485
486 def consumev2(repo, fp, filecount, filesize):
487 """Apply the contents from a version 2 streaming clone.
488
489 Data is read from an object that only needs to provide a ``read(size)``
490 method.
491 """
492 with repo.lock():
493 repo.ui.status(_('%d files to transfer, %s of data\n') %
494 (filecount, util.bytecount(filesize)))
495
496 start = util.timer()
497 handledbytes = 0
498 progress = repo.ui.progress
499
500 progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
501
502 vfs = repo.svfs
503
504 with repo.transaction('clone'):
505 with vfs.backgroundclosing(repo.ui):
506 for i in range(filecount):
507 namelen = util.uvarintdecodestream(fp)
508 datalen = util.uvarintdecodestream(fp)
509
510 name = fp.read(namelen)
511
512 if repo.ui.debugflag:
513 repo.ui.debug('adding %s (%s)\n' %
514 (name, util.bytecount(datalen)))
515
516 with vfs(name, 'w') as ofp:
517 for chunk in util.filechunkiter(fp, limit=datalen):
518 handledbytes += len(chunk)
519 progress(_('clone'), handledbytes, total=filesize,
520 unit=_('bytes'))
521 ofp.write(chunk)
522
523 # force @filecache properties to be reloaded from
524 # streamclone-ed file at next access
525 repo.invalidate(clearfilecache=True)
526
527 elapsed = util.timer() - start
528 if elapsed <= 0:
529 elapsed = 0.001
530 progress(_('clone'), None)
531 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
532 (util.bytecount(handledbytes), elapsed,
533 util.bytecount(handledbytes / elapsed)))
534
535 def applybundlev2(repo, fp, filecount, filesize, requirements):
536 missingreqs = [r for r in requirements if r not in repo.supported]
537 if missingreqs:
538 raise error.Abort(_('unable to apply stream clone: '
539 'unsupported format: %s') %
540 ', '.join(sorted(missingreqs)))
541
542 consumev2(repo, fp, filecount, filesize)
General Comments 0
You need to be logged in to leave comments. Login now