##// END OF EJS Templates
discovery: slowly increase sampling size...
discovery: slowly increase sampling size Some pathological discovery runs can requires many roundtrip. When this happens things can get very slow. To make the algorithm more resilience again such pathological case. We slowly increase the sample size with each roundtrip (+5%). This will have a negligible impact on "normal" discovery with few roundtrips, but a large positive impact of case with many roundtrips. Asking more question per roundtrip helps to reduce the undecided set faster. Instead of reducing the undecided set a linear speed (in the worst case), we reduce it as a guaranteed (small) exponential rate. The data below show this slow ramp up in sample size: round trip | 1 | 5 | 10 | 20 | 50 | 100 | 130 | sample size | 200 | 254 | 321 | 517 | 2 199 | 25 123 | 108 549 | covered nodes | 200 | 1 357 | 2 821 | 7 031 | 42 658 | 524 530 | 2 276 755 | To be a bit more concrete, lets take a very pathological case as an example. We are doing discovery from a copy of Mozilla-try to a more recent version of mozilla-unified. Mozilla-unified heads are unknown to the mozilla-try repo and there are over 1 million "missing" changesets. (the discovery is "local" to avoid network interference) Without this change, the discovery: - last 1858 seconds (31 minutes), - does 1700 round trip, - asking about 340 000 nodes. With this change, the discovery: - last 218 seconds (3 minutes, 38 seconds a -88% improvement), - does 94 round trip (-94%), - asking about 344 211 nodes (+1%). Of course, this is an extreme case (and 3 minutes is still slow). However this give a good example of how this sample size increase act as a safety net catching any bad situations. We could image a steeper increase than 5%. For example 10% would give the following number: round trip | 1 | 5 | 10 | 20 | 50 | 75 | 100 | sample size | 200 | 321 | 514 | 1 326 | 23 060 | 249 812 | 2 706 594 | covered nodes | 200 | 1 541 | 3 690 | 12 671 | 251 871 | 2 746 254 | 29 770 966 | In parallel, it is useful to understand these pathological cases and improve them. However the current change provides a general purpose safety net to smooth the impact of pathological cases. To avoid issue with older http server, the increase in sample size only occurs if the protocol has not limit on command argument size.

File last commit:

r41253:afa88401 default
r42546:dbd0fcca default
Show More
exchangev2.py
697 lines | 23.3 KiB | text/x-python | PythonLexer
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
Gregory Szorc
exchangev2: fetch file revisions...
r39676 import collections
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 import weakref
from .i18n import _
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 from .node import (
nullid,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 short,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 )
from . import (
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 bookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 error,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 mdiff,
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 narrowspec,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 phases,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pycompat,
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 repository,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 setdiscovery,
)
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
# If this is a clone and it was requested to perform a "stream clone",
# we obtain the raw files data from the remote then fall back to an
# incremental pull. This is somewhat hacky and is not nearly robust enough
# for long-term usage.
if usingrawchangelogandmanifest:
with repo.transaction('clone'):
_fetchrawstorefiles(repo, remote)
repo.invalidate(clearfilecache=True)
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 tr = pullop.trmanager.transaction()
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 # We don't use the repo's narrow matcher here because the patterns passed
# to exchange.pull() could be different.
narrowmatcher = narrowspec.match(repo.root,
# Empty maps to nevermatcher. So always
# set includes if missing.
pullop.includepats or {'path:.'},
pullop.excludepats)
if pullop.includepats or pullop.excludepats:
pathfilter = {}
if pullop.includepats:
pathfilter[b'include'] = sorted(pullop.includepats)
if pullop.excludepats:
pathfilter[b'exclude'] = sorted(pullop.excludepats)
else:
pathfilter = None
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # And fetch the data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pullheads = pullop.heads or remoteheads
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
if csetres['added']:
phases.registernew(repo, tr, phases.draft, csetres['added'])
# And adjust the phase of all changesets accordingly.
for phase in phases.phasenames:
if phase == b'secret' or not csetres['nodesbyphase'][phase]:
continue
phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
csetres['nodesbyphase'][phase])
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 # Write bookmark updates.
bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
remote.url(), pullop.gettransaction,
explicit=pullop.explicitbookmarks)
Gregory Szorc
exchangev2: fetch file revisions...
r39676 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 # We don't properly support shallow changeset and manifest yet. So we apply
# depth limiting locally.
if pullop.depth:
relevantcsetnodes = set()
clnode = repo.changelog.node
Boris Feld
revset: remove the last usage of "%d" for a non-revision entry...
r41253 for rev in repo.revs(b'ancestors(%ln, %s)',
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 pullheads, pullop.depth - 1):
relevantcsetnodes.add(clnode(rev))
csetrelevantfilter = lambda n: n in relevantcsetnodes
else:
csetrelevantfilter = lambda n: True
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 # If obtaining the raw store files, we need to scan the full repo to
# derive all the changesets, manifests, and linkrevs.
if usingrawchangelogandmanifest:
csetsforfiles = []
mnodesforfiles = []
manifestlinkrevs = {}
for rev in repo:
ctx = repo[rev]
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 node = ctx.node()
if not csetrelevantfilter(node):
continue
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnode = ctx.manifestnode()
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 csetsforfiles.append(node)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnodesforfiles.append(mnode)
manifestlinkrevs[mnode] = rev
else:
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 csetsforfiles = [n for n in csetres['added'] if csetrelevantfilter(n)]
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnodesforfiles = manres['added']
manifestlinkrevs = manres['linkrevs']
Gregory Szorc
exchangev2: fetch file revisions...
r39676 # Find all file nodes referenced by added manifests and fetch those
# revisions.
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
_fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 manifestlinkrevs, shallow=bool(pullop.depth))
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
def _checkuserawstorefiledata(pullop):
"""Check whether we should use rawstorefiledata command to retrieve data."""
repo = pullop.repo
remote = pullop.remote
# Command to obtain raw store data isn't available.
if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
return False
# Only honor if user requested stream clone operation.
if not pullop.streamclonerequested:
return False
# Only works on empty repos.
if len(repo):
return False
# TODO This is super hacky. There needs to be a storage API for this. We
# also need to check for compatibility with the remote.
if b'revlogv1' not in repo.requirements:
return False
return True
def _fetchrawstorefiles(repo, remote):
with remote.commandexecutor() as e:
objs = e.callcommand(b'rawstorefiledata', {
b'files': [b'changelog', b'manifestlog'],
}).result()
# First object is a summary of files data that follows.
overall = next(objs)
progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
unit=_('bytes'))
with progress:
progress.update(0)
# Next are pairs of file metadata, data.
while True:
try:
filemeta = next(objs)
except StopIteration:
break
for k in (b'location', b'path', b'size'):
if k not in filemeta:
raise error.Abort(_(b'remote file data missing key: %s')
% k)
if filemeta[b'location'] == b'store':
vfs = repo.svfs
else:
raise error.Abort(_(b'invalid location for raw file data: '
b'%s') % filemeta[b'location'])
bytesremaining = filemeta[b'size']
with vfs.open(filemeta[b'path'], b'wb') as fh:
while True:
try:
chunk = next(objs)
except StopIteration:
break
bytesremaining -= len(chunk)
if bytesremaining < 0:
raise error.Abort(_(
b'received invalid number of bytes for file '
b'data; expected %d, got extra') %
filemeta[b'size'])
progress.increment(step=len(chunk))
fh.write(chunk)
try:
if chunk.islast:
break
except AttributeError:
raise error.Abort(_(
b'did not receive indefinite length bytestring '
b'for file data'))
if bytesremaining:
raise error.Abort(_(b'received invalid number of bytes for'
b'file data; expected %d got %d') %
(filemeta[b'size'],
filemeta[b'size'] - bytesremaining))
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
nodemap = repo.unfiltered().changelog.nodemap
common |= {head for head in remoteheads if head in nodemap}
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
objs = e.callcommand(b'changesetdata', {
Gregory Szorc
wireprotov2: change how revisions are specified to changesetdata...
r40212 b'revisions': [{
b'type': b'changesetdagrange',
b'roots': sorted(common),
b'heads': sorted(remoteheads),
}],
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }).result()
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
def _processchangesetdata(repo, tr, objs):
repo.hook('prechangegroup', throw=True,
**pycompat.strkwargs(tr.hookargs))
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
progress = repo.ui.makeprogress(_('changesets'),
unit=_('chunks'),
total=meta.get(b'totalitems'))
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 manifestnodes = {}
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def linkrev(node):
repo.ui.debug('add changeset %s\n' % short(node))
# Linkrev for changelog is always self.
return len(cl)
def onchangeset(cl, node):
progress.increment()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 revision = cl.changelogrevision(node)
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[cl.rev(node)] = revision.manifest
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 nodesbyphase = {phase: set() for phase in phases.phasenames}
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 remotebookmarks = {}
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 #
# This loop also aggregates non-revision metadata, such as phase
# data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def iterrevisions():
for cset in objs:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
Gregory Szorc
wireprotov2: add TODOs around extending changesetdata fields...
r39672 # TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in cset.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # Some entries might only be metadata only updates.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 if b'revision' not in extrafields:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 data = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
yield (
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
)
added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
addrevisioncb=onchangeset)
progress.complete()
return {
'added': added,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 'nodesbyphase': nodesbyphase,
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 'bookmarks': remotebookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 'manifestnodes': manifestnodes,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
for clrev, node in sorted(manifestnodes.iteritems()):
if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in manifest.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = manifest[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
0
)
progress.increment()
progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
total=len(fetchnodes))
Gregory Szorc
exchangev2: honor server advertised manifestdata recommended batch size...
r40209 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 # TODO make size configurable on client?
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
batch = [node for node in fetchnodes[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
objs = e.callcommand(b'manifestdata', {
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
Gregory Szorc
wireprotov2: let clients drive delta behavior...
r39677 b'haveparents': True,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }).result()
# Chomp off header object.
next(objs)
added.extend(rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr)))
progress.complete()
return {
'added': added,
Gregory Szorc
exchangev2: fetch file revisions...
r39676 'linkrevs': linkrevs,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
Gregory Szorc
exchangev2: fetch file revisions...
r39676 """Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 progress = repo.ui.makeprogress(
_('scanning manifests'), total=len(manifestnodes))
with progress:
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 # TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that
# takes the delta between the manifest's parents. And ideally we
# would ignore file nodes that are known locally. For now, ignore
# both these limitations. This will result in incremental fetches
# requesting data we already have. So this is far from ideal.
md = m.readfast()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 for path, fnode in md.items():
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if matcher(path):
fnodes[path].setdefault(fnode, manifestnode)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071
progress.increment()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
return fnodes
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit file revisions."""
Gregory Szorc
exchangev2: fetch file revisions...
r39676 def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = filerevision[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch file revisions...
r39676 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
progress = repo.ui.makeprogress(
_('files'), unit=_('chunks'),
total=sum(len(v) for v in fnodes.itervalues()))
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
batch = [x for x in fnodeslist[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
fs.append((path, e.callcommand(b'filedata', {
b'path': path,
b'nodes': sorted(nodes),
Gregory Szorc
wireprotov2: let clients drive delta behavior...
r39677 b'fields': {b'parents', b'revision'},
b'haveparents': True,
Gregory Szorc
exchangev2: fetch file revisions...
r39676 })))
locallinkrevs[path] = {
node: linkrevs[manifestnode]
for node, manifestnode in nodes.iteritems()}
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
weakref.proxy(tr))
Gregory Szorc
exchangev2: use filesdata...
r40215
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 def _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csets,
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 manlinkrevs, shallow=False):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit changeset revisions."""
def iterrevisions(objs, remaining, progress):
while remaining:
filerevision = next(objs)
node = filerevision[b'node']
extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = filerevision[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 if b'linknode' in filerevision:
linknode = filerevision[b'linknode']
else:
linknode = node
Gregory Szorc
exchangev2: use filesdata...
r40215 yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 linknode,
Gregory Szorc
exchangev2: use filesdata...
r40215 basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
remaining -= 1
progress = repo.ui.makeprogress(
_('files'), unit=_('chunks'),
total=sum(len(v) for v in fnodes.itervalues()))
commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
fields = {b'parents', b'revision'}
clrev = repo.changelog.rev
# There are no guarantees that we'll have ancestor revisions if
# a) this repo has shallow file storage b) shallow data fetching is enabled.
# Force remote to not delta against possibly unknown revisions when these
# conditions hold.
haveparents = not (shallowfiles or shallow)
# Similarly, we may not have calculated linkrevs for all incoming file
# revisions. Ask the remote to do work for us in this case.
if not haveparents:
fields.add(b'linknode')
Gregory Szorc
exchangev2: use filesdata...
r40215 for i in pycompat.xrange(0, len(csets), batchsize):
batch = [x for x in csets[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
args = {
b'revisions': [{
b'type': b'changesetexplicit',
b'nodes': batch,
}],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 b'fields': fields,
b'haveparents': haveparents,
Gregory Szorc
exchangev2: use filesdata...
r40215 }
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if pathfilter:
args[b'pathfilter'] = pathfilter
Gregory Szorc
exchangev2: use filesdata...
r40215 objs = e.callcommand(b'filesdata', args).result()
# First object is an overall header.
overall = next(objs)
# We have overall['totalpaths'] segments.
for i in pycompat.xrange(overall[b'totalpaths']):
header = next(objs)
path = header[b'path']
store = repo.file(path)
linkrevs = {
fnode: manlinkrevs[mnode]
for fnode, mnode in fnodes[path].iteritems()}
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 def getlinkrev(node):
if node in linkrevs:
return linkrevs[node]
else:
return clrev(node)
Gregory Szorc
exchangev2: use filesdata...
r40215 store.addgroup(iterrevisions(objs, header[b'totalitems'],
progress),
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 getlinkrev,
weakref.proxy(tr),
maybemissingparents=shallow)