Show More
@@ -428,3 +428,115 b' class streamcloneapplier(object):' | |||||
428 |
|
428 | |||
429 | def apply(self, repo): |
|
429 | def apply(self, repo): | |
430 | return applybundlev1(repo, self._fh) |
|
430 | return applybundlev1(repo, self._fh) | |
|
431 | ||||
|
432 | def _emit(repo, entries, totalfilesize): | |||
|
433 | """actually emit the stream bundle""" | |||
|
434 | progress = repo.ui.progress | |||
|
435 | progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) | |||
|
436 | vfs = repo.svfs | |||
|
437 | try: | |||
|
438 | seen = 0 | |||
|
439 | for name, size in entries: | |||
|
440 | yield util.uvarintencode(len(name)) | |||
|
441 | fp = vfs(name) | |||
|
442 | try: | |||
|
443 | yield util.uvarintencode(size) | |||
|
444 | yield name | |||
|
445 | if size <= 65536: | |||
|
446 | chunks = (fp.read(size),) | |||
|
447 | else: | |||
|
448 | chunks = util.filechunkiter(fp, limit=size) | |||
|
449 | for chunk in chunks: | |||
|
450 | seen += len(chunk) | |||
|
451 | progress(_('bundle'), seen, total=totalfilesize, | |||
|
452 | unit=_('bytes')) | |||
|
453 | yield chunk | |||
|
454 | finally: | |||
|
455 | fp.close() | |||
|
456 | finally: | |||
|
457 | progress(_('bundle'), None) | |||
|
458 | ||||
|
459 | def generatev2(repo): | |||
|
460 | """Emit content for version 2 of a streaming clone. | |||
|
461 | ||||
|
462 | the data stream consists the following entries: | |||
|
463 | 1) A varint containing the length of the filename | |||
|
464 | 2) A varint containing the length of file data | |||
|
465 | 3) N bytes containing the filename (the internal, store-agnostic form) | |||
|
466 | 4) N bytes containing the file data | |||
|
467 | ||||
|
468 | Returns a 3-tuple of (file count, file size, data iterator). | |||
|
469 | """ | |||
|
470 | ||||
|
471 | with repo.lock(): | |||
|
472 | ||||
|
473 | entries = [] | |||
|
474 | totalfilesize = 0 | |||
|
475 | ||||
|
476 | repo.ui.debug('scanning\n') | |||
|
477 | for name, ename, size in _walkstreamfiles(repo): | |||
|
478 | if size: | |||
|
479 | entries.append((name, size)) | |||
|
480 | totalfilesize += size | |||
|
481 | ||||
|
482 | chunks = _emit(repo, entries, totalfilesize) | |||
|
483 | ||||
|
484 | return len(entries), totalfilesize, chunks | |||
|
485 | ||||
|
486 | def consumev2(repo, fp, filecount, filesize): | |||
|
487 | """Apply the contents from a version 2 streaming clone. | |||
|
488 | ||||
|
489 | Data is read from an object that only needs to provide a ``read(size)`` | |||
|
490 | method. | |||
|
491 | """ | |||
|
492 | with repo.lock(): | |||
|
493 | repo.ui.status(_('%d files to transfer, %s of data\n') % | |||
|
494 | (filecount, util.bytecount(filesize))) | |||
|
495 | ||||
|
496 | start = util.timer() | |||
|
497 | handledbytes = 0 | |||
|
498 | progress = repo.ui.progress | |||
|
499 | ||||
|
500 | progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) | |||
|
501 | ||||
|
502 | vfs = repo.svfs | |||
|
503 | ||||
|
504 | with repo.transaction('clone'): | |||
|
505 | with vfs.backgroundclosing(repo.ui): | |||
|
506 | for i in range(filecount): | |||
|
507 | namelen = util.uvarintdecodestream(fp) | |||
|
508 | datalen = util.uvarintdecodestream(fp) | |||
|
509 | ||||
|
510 | name = fp.read(namelen) | |||
|
511 | ||||
|
512 | if repo.ui.debugflag: | |||
|
513 | repo.ui.debug('adding %s (%s)\n' % | |||
|
514 | (name, util.bytecount(datalen))) | |||
|
515 | ||||
|
516 | with vfs(name, 'w') as ofp: | |||
|
517 | for chunk in util.filechunkiter(fp, limit=datalen): | |||
|
518 | handledbytes += len(chunk) | |||
|
519 | progress(_('clone'), handledbytes, total=filesize, | |||
|
520 | unit=_('bytes')) | |||
|
521 | ofp.write(chunk) | |||
|
522 | ||||
|
523 | # force @filecache properties to be reloaded from | |||
|
524 | # streamclone-ed file at next access | |||
|
525 | repo.invalidate(clearfilecache=True) | |||
|
526 | ||||
|
527 | elapsed = util.timer() - start | |||
|
528 | if elapsed <= 0: | |||
|
529 | elapsed = 0.001 | |||
|
530 | progress(_('clone'), None) | |||
|
531 | repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % | |||
|
532 | (util.bytecount(handledbytes), elapsed, | |||
|
533 | util.bytecount(handledbytes / elapsed))) | |||
|
534 | ||||
|
535 | def applybundlev2(repo, fp, filecount, filesize, requirements): | |||
|
536 | missingreqs = [r for r in requirements if r not in repo.supported] | |||
|
537 | if missingreqs: | |||
|
538 | raise error.Abort(_('unable to apply stream clone: ' | |||
|
539 | 'unsupported format: %s') % | |||
|
540 | ', '.join(sorted(missingreqs))) | |||
|
541 | ||||
|
542 | consumev2(repo, fp, filecount, filesize) |
General Comments 0
You need to be logged in to leave comments.
Login now