Show More
@@ -506,6 +506,74 b' class revlog:' | |||||
506 | except FileNotFoundError: |
|
506 | except FileNotFoundError: | |
507 | return b'' |
|
507 | return b'' | |
508 |
|
508 | |||
|
509 | def get_streams(self, max_linkrev): | |||
|
510 | n = len(self) | |||
|
511 | index = self.index | |||
|
512 | while n > 0: | |||
|
513 | linkrev = index[n - 1][4] | |||
|
514 | if linkrev < max_linkrev: | |||
|
515 | break | |||
|
516 | # note: this loop will rarely go through multiple iterations, since | |||
|
517 | # it only traverses commits created during the current streaming | |||
|
518 | # pull operation. | |||
|
519 | # | |||
|
520 | # If this become a problem, using a binary search should cap the | |||
|
521 | # runtime of this. | |||
|
522 | n = n - 1 | |||
|
523 | if n == 0: | |||
|
524 | # no data to send | |||
|
525 | return [] | |||
|
526 | index_size = n * index.entry_size | |||
|
527 | data_size = self.end(n - 1) | |||
|
528 | ||||
|
529 | # XXX we might have been split (or stripped) since the object | |||
|
530 | # initialization, We need to close this race too, but having a way to | |||
|
531 | # pre-open the file we feed to the revlog and never closing them before | |||
|
532 | # we are done streaming. | |||
|
533 | ||||
|
534 | if self._inline: | |||
|
535 | ||||
|
536 | def get_stream(): | |||
|
537 | with self._indexfp() as fp: | |||
|
538 | yield None | |||
|
539 | size = index_size + data_size | |||
|
540 | if size <= 65536: | |||
|
541 | yield fp.read(size) | |||
|
542 | else: | |||
|
543 | yield from util.filechunkiter(fp, limit=size) | |||
|
544 | ||||
|
545 | inline_stream = get_stream() | |||
|
546 | next(inline_stream) | |||
|
547 | return [ | |||
|
548 | (self._indexfile, inline_stream, index_size + data_size), | |||
|
549 | ] | |||
|
550 | else: | |||
|
551 | ||||
|
552 | def get_index_stream(): | |||
|
553 | with self._indexfp() as fp: | |||
|
554 | yield None | |||
|
555 | if index_size <= 65536: | |||
|
556 | yield fp.read(index_size) | |||
|
557 | else: | |||
|
558 | yield from util.filechunkiter(fp, limit=index_size) | |||
|
559 | ||||
|
560 | def get_data_stream(): | |||
|
561 | with self._datafp() as fp: | |||
|
562 | yield None | |||
|
563 | if data_size <= 65536: | |||
|
564 | yield fp.read(data_size) | |||
|
565 | else: | |||
|
566 | yield from util.filechunkiter(fp, limit=data_size) | |||
|
567 | ||||
|
568 | index_stream = get_index_stream() | |||
|
569 | next(index_stream) | |||
|
570 | data_stream = get_data_stream() | |||
|
571 | next(data_stream) | |||
|
572 | return [ | |||
|
573 | (self._datafile, data_stream, data_size), | |||
|
574 | (self._indexfile, index_stream, index_size), | |||
|
575 | ] | |||
|
576 | ||||
509 | def _loadindex(self, docket=None): |
|
577 | def _loadindex(self, docket=None): | |
510 |
|
578 | |||
511 | new_header, mmapindexthreshold, force_nodemap = self._init_opts() |
|
579 | new_header, mmapindexthreshold, force_nodemap = self._init_opts() |
@@ -509,7 +509,13 b' class BaseStoreEntry:' | |||||
509 | def files(self) -> List[StoreFile]: |
|
509 | def files(self) -> List[StoreFile]: | |
510 | raise NotImplementedError |
|
510 | raise NotImplementedError | |
511 |
|
511 | |||
512 |
def get_streams( |
|
512 | def get_streams( | |
|
513 | self, | |||
|
514 | repo=None, | |||
|
515 | vfs=None, | |||
|
516 | copies=None, | |||
|
517 | max_changeset=None, | |||
|
518 | ): | |||
513 | """return a list of data stream associated to files for this entry |
|
519 | """return a list of data stream associated to files for this entry | |
514 |
|
520 | |||
515 | return [(unencoded_file_path, content_iterator, content_size), …] |
|
521 | return [(unencoded_file_path, content_iterator, content_size), …] | |
@@ -605,6 +611,57 b' class RevlogStoreEntry(BaseStoreEntry):' | |||||
605 | self._files.append(StoreFile(unencoded_path=path, **data)) |
|
611 | self._files.append(StoreFile(unencoded_path=path, **data)) | |
606 | return self._files |
|
612 | return self._files | |
607 |
|
613 | |||
|
614 | def get_streams( | |||
|
615 | self, | |||
|
616 | repo=None, | |||
|
617 | vfs=None, | |||
|
618 | copies=None, | |||
|
619 | max_changeset=None, | |||
|
620 | ): | |||
|
621 | if repo is None or max_changeset is None: | |||
|
622 | return super().get_streams( | |||
|
623 | repo=repo, | |||
|
624 | vfs=vfs, | |||
|
625 | copies=copies, | |||
|
626 | max_changeset=max_changeset, | |||
|
627 | ) | |||
|
628 | if any(k.endswith(b'.idx') for k in self._details.keys()): | |||
|
629 | # This use revlog-v2, ignore for now | |||
|
630 | return super().get_streams( | |||
|
631 | repo=repo, | |||
|
632 | vfs=vfs, | |||
|
633 | copies=copies, | |||
|
634 | max_changeset=max_changeset, | |||
|
635 | ) | |||
|
636 | name_to_ext = {} | |||
|
637 | for ext in self._details.keys(): | |||
|
638 | name_to_ext[self._path_prefix + ext] = ext | |||
|
639 | name_to_size = {} | |||
|
640 | for f in self.files(): | |||
|
641 | name_to_size[f.unencoded_path] = f.file_size(None) | |||
|
642 | stream = [ | |||
|
643 | f.get_stream(vfs, copies) | |||
|
644 | for f in self.files() | |||
|
645 | if name_to_ext[f.unencoded_path] not in (b'.d', b'.i') | |||
|
646 | ] | |||
|
647 | ||||
|
648 | rl = self.get_revlog_instance(repo).get_revlog() | |||
|
649 | rl_stream = rl.get_streams(max_changeset) | |||
|
650 | for name, s, size in rl_stream: | |||
|
651 | if name_to_size.get(name, 0) != size: | |||
|
652 | msg = _(b"expected %d bytes but %d provided for %s") | |||
|
653 | msg %= name_to_size.get(name, 0), size, name | |||
|
654 | raise error.Abort(msg) | |||
|
655 | stream.extend(rl_stream) | |||
|
656 | files = self.files() | |||
|
657 | assert len(stream) == len(files), ( | |||
|
658 | stream, | |||
|
659 | files, | |||
|
660 | self._path_prefix, | |||
|
661 | self.target_id, | |||
|
662 | ) | |||
|
663 | return stream | |||
|
664 | ||||
608 | def get_revlog_instance(self, repo): |
|
665 | def get_revlog_instance(self, repo): | |
609 | """Obtain a revlog instance from this store entry |
|
666 | """Obtain a revlog instance from this store entry | |
610 |
|
667 |
@@ -635,6 +635,7 b' def _emit2(repo, entries):' | |||||
635 | # translate the vfs one |
|
635 | # translate the vfs one | |
636 | entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] |
|
636 | entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] | |
637 |
|
637 | |||
|
638 | max_linkrev = len(repo) | |||
638 | file_count = totalfilesize = 0 |
|
639 | file_count = totalfilesize = 0 | |
639 | # record the expected size of every file |
|
640 | # record the expected size of every file | |
640 | for k, vfs, e in entries: |
|
641 | for k, vfs, e in entries: | |
@@ -657,7 +658,10 b' def _emit2(repo, entries):' | |||||
657 | totalbytecount = 0 |
|
658 | totalbytecount = 0 | |
658 |
|
659 | |||
659 | for src, vfs, e in entries: |
|
660 | for src, vfs, e in entries: | |
660 |
|
|
661 | entry_streams = e.get_streams( | |
|
662 | repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev | |||
|
663 | ) | |||
|
664 | for name, stream, size in entry_streams: | |||
661 | yield src |
|
665 | yield src | |
662 | yield util.uvarintencode(len(name)) |
|
666 | yield util.uvarintencode(len(name)) | |
663 | yield util.uvarintencode(size) |
|
667 | yield util.uvarintencode(size) |
@@ -86,10 +86,10 b' wait for the client to be done cloning.' | |||||
86 | Check everything is fine |
|
86 | Check everything is fine | |
87 |
|
87 | |||
88 | $ cat client.log |
|
88 | $ cat client.log | |
89 |
remote: abort: unexpected error: |
|
89 | remote: abort: unexpected error: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !) | |
90 | abort: pull failed on remote (known-bad-output !) |
|
90 | abort: pull failed on remote (known-bad-output !) | |
91 | $ tail -2 errors.log |
|
91 | $ tail -2 errors.log | |
92 |
mercurial.error.Abort: |
|
92 | mercurial.error.Abort: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !) | |
93 | (known-bad-output !) |
|
93 | (known-bad-output !) | |
94 |
$ |
|
94 | $ hg -R clone-while-split verify | |
95 | checking changesets (missing-correct-output !) |
|
95 | checking changesets (missing-correct-output !) |
General Comments 0
You need to be logged in to leave comments.
Login now