##// END OF EJS Templates
debugdiscovery: fix swapped heads and roots...
debugdiscovery: fix swapped heads and roots Patch provided without comment… Differential Revision: https://phab.mercurial-scm.org/D9566

File last commit:

r46554:89a2afe3 default
r46727:9e24b3d8 default
Show More
exchangev2.py
795 lines | 24.5 KiB | text/x-python | PythonLexer
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
Gregory Szorc
exchangev2: fetch file revisions...
r39676 import collections
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 import weakref
from .i18n import _
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 from .node import (
nullid,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 short,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 )
from . import (
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 bookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 error,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 mdiff,
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 narrowspec,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 phases,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pycompat,
Pulkit Goyal
interfaces: create a new folder for interfaces and move repository.py in it...
r43078 setdiscovery,
)
Augie Fackler
formatting: blacken the codebase...
r43346 from .interfaces import repository
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
# If this is a clone and it was requested to perform a "stream clone",
# we obtain the raw files data from the remote then fall back to an
# incremental pull. This is somewhat hacky and is not nearly robust enough
# for long-term usage.
if usingrawchangelogandmanifest:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 with repo.transaction(b'clone'):
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 _fetchrawstorefiles(repo, remote)
repo.invalidate(clearfilecache=True)
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 tr = pullop.trmanager.transaction()
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 # We don't use the repo's narrow matcher here because the patterns passed
# to exchange.pull() could be different.
Augie Fackler
formatting: blacken the codebase...
r43346 narrowmatcher = narrowspec.match(
repo.root,
# Empty maps to nevermatcher. So always
# set includes if missing.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 pullop.includepats or {b'path:.'},
Augie Fackler
formatting: blacken the codebase...
r43346 pullop.excludepats,
)
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363
if pullop.includepats or pullop.excludepats:
pathfilter = {}
if pullop.includepats:
pathfilter[b'include'] = sorted(pullop.includepats)
if pullop.excludepats:
pathfilter[b'exclude'] = sorted(pullop.excludepats)
else:
pathfilter = None
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
Augie Fackler
formatting: blacken the codebase...
r43346 repo, remote, pullop.heads, abortwhenunrelated=pullop.force
)
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # And fetch the data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pullheads = pullop.heads or remoteheads
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if csetres[b'added']:
Joerg Sonnenberger
phases: convert registernew users to use revision sets...
r46375 phases.registernew(
repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
# And adjust the phase of all changesets accordingly.
Joerg Sonnenberger
phases: sparsify phase lists...
r45677 for phasenumber, phase in phases.phasenames.items():
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Augie Fackler
formatting: blacken the codebase...
r43346 phases.advanceboundary(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 repo,
tr,
phasenumber,
csetres[b'nodesbyphase'][phase],
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 # Write bookmark updates.
Augie Fackler
formatting: blacken the codebase...
r43346 bookmarks.updatefromremote(
repo.ui,
repo,
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 csetres[b'bookmarks'],
Augie Fackler
formatting: blacken the codebase...
r43346 remote.url(),
pullop.gettransaction,
explicit=pullop.explicitbookmarks,
)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 # We don't properly support shallow changeset and manifest yet. So we apply
# depth limiting locally.
if pullop.depth:
relevantcsetnodes = set()
clnode = repo.changelog.node
Augie Fackler
formatting: blacken the codebase...
r43346 for rev in repo.revs(
b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
):
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 relevantcsetnodes.add(clnode(rev))
csetrelevantfilter = lambda n: n in relevantcsetnodes
else:
csetrelevantfilter = lambda n: True
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 # If obtaining the raw store files, we need to scan the full repo to
# derive all the changesets, manifests, and linkrevs.
if usingrawchangelogandmanifest:
csetsforfiles = []
mnodesforfiles = []
manifestlinkrevs = {}
for rev in repo:
ctx = repo[rev]
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 node = ctx.node()
if not csetrelevantfilter(node):
continue
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnode = ctx.manifestnode()
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 csetsforfiles.append(node)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnodesforfiles.append(mnode)
manifestlinkrevs[mnode] = rev
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
mnodesforfiles = manres[b'added']
manifestlinkrevs = manres[b'linkrevs']
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
Gregory Szorc
exchangev2: fetch file revisions...
r39676 # Find all file nodes referenced by added manifests and fetch those
# revisions.
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
Augie Fackler
formatting: blacken the codebase...
r43346 _fetchfilesfromcsets(
repo,
tr,
remote,
pathfilter,
fnodes,
csetsforfiles,
manifestlinkrevs,
shallow=bool(pullop.depth),
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
def _checkuserawstorefiledata(pullop):
"""Check whether we should use rawstorefiledata command to retrieve data."""
repo = pullop.repo
remote = pullop.remote
# Command to obtain raw store data isn't available.
if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
return False
# Only honor if user requested stream clone operation.
if not pullop.streamclonerequested:
return False
# Only works on empty repos.
if len(repo):
return False
# TODO This is super hacky. There needs to be a storage API for this. We
# also need to check for compatibility with the remote.
if b'revlogv1' not in repo.requirements:
return False
return True
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 def _fetchrawstorefiles(repo, remote):
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 b'rawstorefiledata',
{
b'files': [b'changelog', b'manifestlog'],
},
Augie Fackler
formatting: blacken the codebase...
r43346 ).result()
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
# First object is a summary of files data that follows.
overall = next(objs)
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 with progress:
progress.update(0)
# Next are pairs of file metadata, data.
while True:
try:
filemeta = next(objs)
except StopIteration:
break
for k in (b'location', b'path', b'size'):
if k not in filemeta:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(b'remote file data missing key: %s') % k
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
if filemeta[b'location'] == b'store':
vfs = repo.svfs
else:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 _(b'invalid location for raw file data: %s')
Augie Fackler
formatting: blacken the codebase...
r43346 % filemeta[b'location']
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
bytesremaining = filemeta[b'size']
with vfs.open(filemeta[b'path'], b'wb') as fh:
while True:
try:
chunk = next(objs)
except StopIteration:
break
bytesremaining -= len(chunk)
if bytesremaining < 0:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'received invalid number of bytes for file '
b'data; expected %d, got extra'
)
% filemeta[b'size']
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
progress.increment(step=len(chunk))
fh.write(chunk)
try:
if chunk.islast:
break
except AttributeError:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'did not receive indefinite length bytestring '
b'for file data'
)
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
if bytesremaining:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'received invalid number of bytes for'
b'file data; expected %d got %d'
)
% (
filemeta[b'size'],
filemeta[b'size'] - bytesremaining,
)
)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
Augie Fackler
formatting: blacken the codebase...
r43346 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
)
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
index: use `index.has_node` in `exchangev2._pullchangesetdiscovery`...
r43946 has_node = repo.unfiltered().changelog.index.has_node
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
index: use `index.has_node` in `exchangev2._pullchangesetdiscovery`...
r43946 common |= {head for head in remoteheads if has_node(head)}
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
b'changesetdata',
{
b'revisions': [
{
b'type': b'changesetdagrange',
b'roots': sorted(common),
b'heads': sorted(remoteheads),
}
],
b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
},
).result()
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def _processchangesetdata(repo, tr, objs):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 manifestnodes = {}
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 added = []
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def linkrev(node):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 repo.ui.debug(b'add changeset %s\n' % short(node))
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # Linkrev for changelog is always self.
return len(cl)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 def ondupchangeset(cl, node):
added.append(node)
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def onchangeset(cl, node):
progress.increment()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 revision = cl.changelogrevision(node)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 added.append(node)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[cl.rev(node)] = revision.manifest
Joerg Sonnenberger
phases: sparsify phase lists...
r45677 nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 remotebookmarks = {}
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 #
# This loop also aggregates non-revision metadata, such as phase
# data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def iterrevisions():
for cset in objs:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
Gregory Szorc
wireprotov2: add TODOs around extending changesetdata fields...
r39672 # TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in cset.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # Some entries might only be metadata only updates.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 if b'revision' not in extrafields:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 data = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
yield (
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 cl.addgroup(
iterrevisions(),
linkrev,
weakref.proxy(tr),
addrevisioncb=onchangeset,
duplicaterevisioncb=ondupchangeset,
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
progress.complete()
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'added': added,
b'nodesbyphase': nodesbyphase,
b'bookmarks': remotebookmarks,
b'manifestnodes': manifestnodes,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in manifest.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = manifest[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
Augie Fackler
formatting: blacken the codebase...
r43346 0,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 )
progress.increment()
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: honor server advertised manifestdata recommended batch size...
r40209 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 # TODO make size configurable on client?
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [node for node in fetchnodes[i : i + batchsize]]
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 if not batch:
continue
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
b'manifestdata',
{
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
).result()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
# Chomp off header object.
next(objs)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 def onchangeset(cl, node):
added.append(node)
rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr),
addrevisioncb=onchangeset,
duplicaterevisioncb=onchangeset,
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
progress.complete()
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'added': added,
b'linkrevs': linkrevs,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
Gregory Szorc
exchangev2: fetch file revisions...
r39676 """Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'scanning manifests'), total=len(manifestnodes)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071
with progress:
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 # TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that
# takes the delta between the manifest's parents. And ideally we
# would ignore file nodes that are known locally. For now, ignore
# both these limitations. This will result in incremental fetches
# requesting data we already have. So this is far from ideal.
md = m.readfast()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 for path, fnode in md.items():
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if matcher(path):
fnodes[path].setdefault(fnode, manifestnode)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071
progress.increment()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
return fnodes
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch file revisions...
r39676 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit file revisions."""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch file revisions...
r39676 def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = filerevision[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch file revisions...
r39676 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'files'),
unit=_(b'chunks'),
Gregory Szorc
py3: define and use pycompat.itervalues()...
r43374 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch file revisions...
r39676
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [x for x in fnodeslist[i : i + batchsize]]
Gregory Szorc
exchangev2: fetch file revisions...
r39676 if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
Augie Fackler
formatting: blacken the codebase...
r43346 fs.append(
(
path,
e.callcommand(
b'filedata',
{
b'path': path,
b'nodes': sorted(nodes),
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
),
)
)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
locallinkrevs[path] = {
node: linkrevs[manifestnode]
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for node, manifestnode in pycompat.iteritems(nodes)
Augie Fackler
formatting: blacken the codebase...
r43346 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
Augie Fackler
formatting: blacken the codebase...
r43346 weakref.proxy(tr),
)
Gregory Szorc
exchangev2: use filesdata...
r40215
Augie Fackler
formatting: blacken the codebase...
r43346 def _fetchfilesfromcsets(
repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit changeset revisions."""
def iterrevisions(objs, remaining, progress):
while remaining:
filerevision = next(objs)
node = filerevision[b'node']
extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = filerevision[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 if b'linknode' in filerevision:
linknode = filerevision[b'linknode']
else:
linknode = node
Gregory Szorc
exchangev2: use filesdata...
r40215 yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 linknode,
Gregory Szorc
exchangev2: use filesdata...
r40215 basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
remaining -= 1
progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'files'),
unit=_(b'chunks'),
Gregory Szorc
py3: define and use pycompat.itervalues()...
r43374 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: use filesdata...
r40215
commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
fields = {b'parents', b'revision'}
clrev = repo.changelog.rev
# There are no guarantees that we'll have ancestor revisions if
# a) this repo has shallow file storage b) shallow data fetching is enabled.
# Force remote to not delta against possibly unknown revisions when these
# conditions hold.
haveparents = not (shallowfiles or shallow)
# Similarly, we may not have calculated linkrevs for all incoming file
# revisions. Ask the remote to do work for us in this case.
if not haveparents:
fields.add(b'linknode')
Gregory Szorc
exchangev2: use filesdata...
r40215 for i in pycompat.xrange(0, len(csets), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [x for x in csets[i : i + batchsize]]
Gregory Szorc
exchangev2: use filesdata...
r40215 if not batch:
continue
with remote.commandexecutor() as e:
args = {
Augie Fackler
formatting: blacken the codebase...
r43346 b'revisions': [
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 {
b'type': b'changesetexplicit',
b'nodes': batch,
}
Augie Fackler
formatting: blacken the codebase...
r43346 ],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 b'fields': fields,
b'haveparents': haveparents,
Gregory Szorc
exchangev2: use filesdata...
r40215 }
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if pathfilter:
args[b'pathfilter'] = pathfilter
Gregory Szorc
exchangev2: use filesdata...
r40215 objs = e.callcommand(b'filesdata', args).result()
# First object is an overall header.
overall = next(objs)
# We have overall['totalpaths'] segments.
for i in pycompat.xrange(overall[b'totalpaths']):
header = next(objs)
path = header[b'path']
store = repo.file(path)
linkrevs = {
fnode: manlinkrevs[mnode]
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for fnode, mnode in pycompat.iteritems(fnodes[path])
Augie Fackler
formatting: blacken the codebase...
r43346 }
Gregory Szorc
exchangev2: use filesdata...
r40215
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 def getlinkrev(node):
if node in linkrevs:
return linkrevs[node]
else:
return clrev(node)
Augie Fackler
formatting: blacken the codebase...
r43346 store.addgroup(
iterrevisions(objs, header[b'totalitems'], progress),
getlinkrev,
weakref.proxy(tr),
maybemissingparents=shallow,
)