exchangev2.py
807 lines
| 24.9 KiB
| text/x-python
|
PythonLexer
/ mercurial / exchangev2.py
Gregory Szorc
|
r39665 | # exchangev2.py - repository exchange for wire protocol version 2 | ||
# | ||||
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
Gregory Szorc
|
r39676 | import collections | ||
Gregory Szorc
|
r39667 | import weakref | ||
from .i18n import _ | ||||
Gregory Szorc
|
r39665 | from .node import ( | ||
nullid, | ||||
Gregory Szorc
|
r39667 | short, | ||
Gregory Szorc
|
r39665 | ) | ||
from . import ( | ||||
Gregory Szorc
|
r39671 | bookmarks, | ||
Gregory Szorc
|
r39674 | error, | ||
Gregory Szorc
|
r39667 | mdiff, | ||
Gregory Szorc
|
r40363 | narrowspec, | ||
Gregory Szorc
|
r39669 | phases, | ||
Gregory Szorc
|
r39667 | pycompat, | ||
Raphaël Gomès
|
r47371 | requirements as requirementsmod, | ||
Pulkit Goyal
|
r43078 | setdiscovery, | ||
) | ||||
Augie Fackler
|
r43346 | from .interfaces import repository | ||
Gregory Szorc
|
r39665 | |||
def pull(pullop): | ||||
"""Pull using wire protocol version 2.""" | ||||
repo = pullop.repo | ||||
remote = pullop.remote | ||||
Gregory Szorc
|
r40366 | |||
usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) | ||||
# If this is a clone and it was requested to perform a "stream clone", | ||||
# we obtain the raw files data from the remote then fall back to an | ||||
# incremental pull. This is somewhat hacky and is not nearly robust enough | ||||
# for long-term usage. | ||||
if usingrawchangelogandmanifest: | ||||
Augie Fackler
|
r43347 | with repo.transaction(b'clone'): | ||
Gregory Szorc
|
r40366 | _fetchrawstorefiles(repo, remote) | ||
repo.invalidate(clearfilecache=True) | ||||
Gregory Szorc
|
r39667 | tr = pullop.trmanager.transaction() | ||
Gregory Szorc
|
r39665 | |||
Gregory Szorc
|
r40363 | # We don't use the repo's narrow matcher here because the patterns passed | ||
# to exchange.pull() could be different. | ||||
Augie Fackler
|
r43346 | narrowmatcher = narrowspec.match( | ||
repo.root, | ||||
# Empty maps to nevermatcher. So always | ||||
# set includes if missing. | ||||
Augie Fackler
|
r43347 | pullop.includepats or {b'path:.'}, | ||
Augie Fackler
|
r43346 | pullop.excludepats, | ||
) | ||||
Gregory Szorc
|
r40363 | |||
if pullop.includepats or pullop.excludepats: | ||||
pathfilter = {} | ||||
if pullop.includepats: | ||||
pathfilter[b'include'] = sorted(pullop.includepats) | ||||
if pullop.excludepats: | ||||
pathfilter[b'exclude'] = sorted(pullop.excludepats) | ||||
else: | ||||
pathfilter = None | ||||
Gregory Szorc
|
r39665 | # Figure out what needs to be fetched. | ||
common, fetch, remoteheads = _pullchangesetdiscovery( | ||||
Augie Fackler
|
r43346 | repo, remote, pullop.heads, abortwhenunrelated=pullop.force | ||
) | ||||
Gregory Szorc
|
r39665 | |||
Gregory Szorc
|
r39669 | # And fetch the data. | ||
Gregory Szorc
|
r39667 | pullheads = pullop.heads or remoteheads | ||
Gregory Szorc
|
r39669 | csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads) | ||
# New revisions are written to the changelog. But all other updates | ||||
# are deferred. Do those now. | ||||
# Ensure all new changesets are draft by default. If the repo is | ||||
# publishing, the phase will be adjusted by the loop below. | ||||
Augie Fackler
|
r43347 | if csetres[b'added']: | ||
Joerg Sonnenberger
|
r46375 | phases.registernew( | ||
repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']] | ||||
) | ||||
Gregory Szorc
|
r39669 | |||
# And adjust the phase of all changesets accordingly. | ||||
Joerg Sonnenberger
|
r45677 | for phasenumber, phase in phases.phasenames.items(): | ||
Augie Fackler
|
r43347 | if phase == b'secret' or not csetres[b'nodesbyphase'][phase]: | ||
Gregory Szorc
|
r39669 | continue | ||
Augie Fackler
|
r43346 | phases.advanceboundary( | ||
Augie Fackler
|
r46554 | repo, | ||
tr, | ||||
phasenumber, | ||||
csetres[b'nodesbyphase'][phase], | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39667 | |||
Gregory Szorc
|
r39671 | # Write bookmark updates. | ||
Augie Fackler
|
r43346 | bookmarks.updatefromremote( | ||
repo.ui, | ||||
repo, | ||||
Augie Fackler
|
r43347 | csetres[b'bookmarks'], | ||
Augie Fackler
|
r43346 | remote.url(), | ||
pullop.gettransaction, | ||||
explicit=pullop.explicitbookmarks, | ||||
) | ||||
Gregory Szorc
|
r39671 | |||
Augie Fackler
|
r43347 | manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes']) | ||
Gregory Szorc
|
r39676 | |||
Gregory Szorc
|
r40429 | # We don't properly support shallow changeset and manifest yet. So we apply | ||
# depth limiting locally. | ||||
if pullop.depth: | ||||
relevantcsetnodes = set() | ||||
clnode = repo.changelog.node | ||||
Augie Fackler
|
r43346 | for rev in repo.revs( | ||
b'ancestors(%ln, %s)', pullheads, pullop.depth - 1 | ||||
): | ||||
Gregory Szorc
|
r40429 | relevantcsetnodes.add(clnode(rev)) | ||
csetrelevantfilter = lambda n: n in relevantcsetnodes | ||||
else: | ||||
csetrelevantfilter = lambda n: True | ||||
Gregory Szorc
|
r40366 | # If obtaining the raw store files, we need to scan the full repo to | ||
# derive all the changesets, manifests, and linkrevs. | ||||
if usingrawchangelogandmanifest: | ||||
csetsforfiles = [] | ||||
mnodesforfiles = [] | ||||
manifestlinkrevs = {} | ||||
for rev in repo: | ||||
ctx = repo[rev] | ||||
Gregory Szorc
|
r40429 | node = ctx.node() | ||
if not csetrelevantfilter(node): | ||||
continue | ||||
Gregory Szorc
|
r40366 | mnode = ctx.manifestnode() | ||
Gregory Szorc
|
r40429 | csetsforfiles.append(node) | ||
Gregory Szorc
|
r40366 | mnodesforfiles.append(mnode) | ||
manifestlinkrevs[mnode] = rev | ||||
else: | ||||
Augie Fackler
|
r43347 | csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)] | ||
mnodesforfiles = manres[b'added'] | ||||
manifestlinkrevs = manres[b'linkrevs'] | ||||
Gregory Szorc
|
r40366 | |||
Gregory Szorc
|
r39676 | # Find all file nodes referenced by added manifests and fetch those | ||
# revisions. | ||||
Gregory Szorc
|
r40366 | fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) | ||
Augie Fackler
|
r43346 | _fetchfilesfromcsets( | ||
repo, | ||||
tr, | ||||
remote, | ||||
pathfilter, | ||||
fnodes, | ||||
csetsforfiles, | ||||
manifestlinkrevs, | ||||
shallow=bool(pullop.depth), | ||||
) | ||||
Gregory Szorc
|
r40366 | |||
def _checkuserawstorefiledata(pullop): | ||||
"""Check whether we should use rawstorefiledata command to retrieve data.""" | ||||
repo = pullop.repo | ||||
remote = pullop.remote | ||||
# Command to obtain raw store data isn't available. | ||||
if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: | ||||
return False | ||||
# Only honor if user requested stream clone operation. | ||||
if not pullop.streamclonerequested: | ||||
return False | ||||
# Only works on empty repos. | ||||
if len(repo): | ||||
return False | ||||
# TODO This is super hacky. There needs to be a storage API for this. We | ||||
# also need to check for compatibility with the remote. | ||||
Raphaël Gomès
|
r47371 | if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements: | ||
Gregory Szorc
|
r40366 | return False | ||
return True | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40366 | def _fetchrawstorefiles(repo, remote): | ||
with remote.commandexecutor() as e: | ||||
Augie Fackler
|
r43346 | objs = e.callcommand( | ||
Augie Fackler
|
r46554 | b'rawstorefiledata', | ||
{ | ||||
b'files': [b'changelog', b'manifestlog'], | ||||
}, | ||||
Augie Fackler
|
r43346 | ).result() | ||
Gregory Szorc
|
r40366 | |||
# First object is a summary of files data that follows. | ||||
overall = next(objs) | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40366 | with progress: | ||
progress.update(0) | ||||
# Next are pairs of file metadata, data. | ||||
while True: | ||||
try: | ||||
filemeta = next(objs) | ||||
except StopIteration: | ||||
break | ||||
for k in (b'location', b'path', b'size'): | ||||
if k not in filemeta: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_(b'remote file data missing key: %s') % k | ||||
) | ||||
Gregory Szorc
|
r40366 | |||
if filemeta[b'location'] == b'store': | ||||
vfs = repo.svfs | ||||
else: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
Martin von Zweigbergk
|
r43387 | _(b'invalid location for raw file data: %s') | ||
Augie Fackler
|
r43346 | % filemeta[b'location'] | ||
) | ||||
Gregory Szorc
|
r40366 | |||
bytesremaining = filemeta[b'size'] | ||||
with vfs.open(filemeta[b'path'], b'wb') as fh: | ||||
while True: | ||||
try: | ||||
chunk = next(objs) | ||||
except StopIteration: | ||||
break | ||||
bytesremaining -= len(chunk) | ||||
if bytesremaining < 0: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
b'received invalid number of bytes for file ' | ||||
b'data; expected %d, got extra' | ||||
) | ||||
% filemeta[b'size'] | ||||
) | ||||
Gregory Szorc
|
r40366 | |||
progress.increment(step=len(chunk)) | ||||
fh.write(chunk) | ||||
try: | ||||
if chunk.islast: | ||||
break | ||||
except AttributeError: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
b'did not receive indefinite length bytestring ' | ||||
b'for file data' | ||||
) | ||||
) | ||||
Gregory Szorc
|
r40366 | |||
if bytesremaining: | ||||
Augie Fackler
|
r43346 | raise error.Abort( | ||
_( | ||||
b'received invalid number of bytes for' | ||||
b'file data; expected %d got %d' | ||||
) | ||||
% ( | ||||
filemeta[b'size'], | ||||
filemeta[b'size'] - bytesremaining, | ||||
) | ||||
) | ||||
Gregory Szorc
|
r39674 | |||
Gregory Szorc
|
r39665 | def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): | ||
"""Determine which changesets need to be pulled.""" | ||||
if heads: | ||||
knownnode = repo.changelog.hasnode | ||||
if all(knownnode(head) for head in heads): | ||||
return heads, False, heads | ||||
# TODO wire protocol version 2 is capable of more efficient discovery | ||||
# than setdiscovery. Consider implementing something better. | ||||
common, fetch, remoteheads = setdiscovery.findcommonheads( | ||||
Augie Fackler
|
r43346 | repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated | ||
) | ||||
Gregory Szorc
|
r39665 | |||
common = set(common) | ||||
remoteheads = set(remoteheads) | ||||
# If a remote head is filtered locally, put it back in the common set. | ||||
# See the comment in exchange._pulldiscoverychangegroup() for more. | ||||
if fetch and remoteheads: | ||||
r43946 | has_node = repo.unfiltered().changelog.index.has_node | |||
Gregory Szorc
|
r39665 | |||
r43946 | common |= {head for head in remoteheads if has_node(head)} | |||
Gregory Szorc
|
r39665 | |||
if set(remoteheads).issubset(common): | ||||
fetch = [] | ||||
common.discard(nullid) | ||||
return common, fetch, remoteheads | ||||
Gregory Szorc
|
r39667 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39667 | def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): | ||
# TODO consider adding a step here where we obtain the DAG shape first | ||||
# (or ask the server to slice changesets into chunks for us) so that | ||||
# we can perform multiple fetches in batches. This will facilitate | ||||
# resuming interrupted clones, higher server-side cache hit rates due | ||||
# to smaller segments, etc. | ||||
with remote.commandexecutor() as e: | ||||
Augie Fackler
|
r43346 | objs = e.callcommand( | ||
b'changesetdata', | ||||
{ | ||||
b'revisions': [ | ||||
{ | ||||
b'type': b'changesetdagrange', | ||||
b'roots': sorted(common), | ||||
b'heads': sorted(remoteheads), | ||||
} | ||||
], | ||||
b'fields': {b'bookmarks', b'parents', b'phase', b'revision'}, | ||||
}, | ||||
).result() | ||||
Gregory Szorc
|
r39667 | |||
# The context manager waits on all response data when exiting. So | ||||
# we need to remain in the context manager in order to stream data. | ||||
return _processchangesetdata(repo, tr, objs) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39667 | def _processchangesetdata(repo, tr, objs): | ||
Augie Fackler
|
r43347 | repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs)) | ||
Gregory Szorc
|
r39667 | |||
urepo = repo.unfiltered() | ||||
cl = urepo.changelog | ||||
cl.delayupdate(tr) | ||||
# The first emitted object is a header describing the data that | ||||
# follows. | ||||
meta = next(objs) | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems') | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39667 | |||
Gregory Szorc
|
r39674 | manifestnodes = {} | ||
Joerg Sonnenberger
|
r46373 | added = [] | ||
Gregory Szorc
|
r39674 | |||
Gregory Szorc
|
r39667 | def linkrev(node): | ||
Augie Fackler
|
r43347 | repo.ui.debug(b'add changeset %s\n' % short(node)) | ||
Gregory Szorc
|
r39667 | # Linkrev for changelog is always self. | ||
return len(cl) | ||||
Joerg Sonnenberger
|
r47259 | def ondupchangeset(cl, rev): | ||
added.append(cl.node(rev)) | ||||
Joerg Sonnenberger
|
r46373 | |||
Joerg Sonnenberger
|
r47259 | def onchangeset(cl, rev): | ||
Gregory Szorc
|
r39667 | progress.increment() | ||
Joerg Sonnenberger
|
r47067 | revision = cl.changelogrevision(rev) | ||
Joerg Sonnenberger
|
r47259 | added.append(cl.node(rev)) | ||
Gregory Szorc
|
r39674 | |||
# We need to preserve the mapping of changelog revision to node | ||||
# so we can set the linkrev accordingly when manifests are added. | ||||
Joerg Sonnenberger
|
r47067 | manifestnodes[rev] = revision.manifest | ||
Gregory Szorc
|
r39674 | |||
Joerg Sonnenberger
|
r47083 | repo.register_changeset(rev, revision) | ||
Joerg Sonnenberger
|
r45677 | nodesbyphase = {phase: set() for phase in phases.phasenames.values()} | ||
Gregory Szorc
|
r39671 | remotebookmarks = {} | ||
Gregory Szorc
|
r39669 | |||
Gregory Szorc
|
r39667 | # addgroup() expects a 7-tuple describing revisions. This normalizes | ||
# the wire data to that format. | ||||
Gregory Szorc
|
r39669 | # | ||
# This loop also aggregates non-revision metadata, such as phase | ||||
# data. | ||||
Gregory Szorc
|
r39667 | def iterrevisions(): | ||
for cset in objs: | ||||
Gregory Szorc
|
r39669 | node = cset[b'node'] | ||
if b'phase' in cset: | ||||
nodesbyphase[cset[b'phase']].add(node) | ||||
Gregory Szorc
|
r39671 | for mark in cset.get(b'bookmarks', []): | ||
remotebookmarks[mark] = node | ||||
Gregory Szorc
|
r39672 | # TODO add mechanism for extensions to examine records so they | ||
# can siphon off custom data fields. | ||||
Gregory Szorc
|
r39839 | extrafields = {} | ||
for field, size in cset.get(b'fieldsfollowing', []): | ||||
extrafields[field] = next(objs) | ||||
Gregory Szorc
|
r39669 | # Some entries might only be metadata only updates. | ||
Gregory Szorc
|
r39839 | if b'revision' not in extrafields: | ||
Gregory Szorc
|
r39669 | continue | ||
Gregory Szorc
|
r39839 | data = extrafields[b'revision'] | ||
Gregory Szorc
|
r39667 | |||
yield ( | ||||
Gregory Szorc
|
r39669 | node, | ||
Gregory Szorc
|
r39667 | cset[b'parents'][0], | ||
cset[b'parents'][1], | ||||
# Linknode is always itself for changesets. | ||||
cset[b'node'], | ||||
# We always send full revisions. So delta base is not set. | ||||
nullid, | ||||
mdiff.trivialdiffheader(len(data)) + data, | ||||
# Flags not yet supported. | ||||
0, | ||||
Raphaël Gomès
|
r47445 | # Sidedata not yet supported | ||
{}, | ||||
Gregory Szorc
|
r39667 | ) | ||
Joerg Sonnenberger
|
r46373 | cl.addgroup( | ||
iterrevisions(), | ||||
linkrev, | ||||
weakref.proxy(tr), | ||||
Joerg Sonnenberger
|
r47085 | alwayscache=True, | ||
Joerg Sonnenberger
|
r46373 | addrevisioncb=onchangeset, | ||
duplicaterevisioncb=ondupchangeset, | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39667 | |||
progress.complete() | ||||
return { | ||||
Augie Fackler
|
r43347 | b'added': added, | ||
b'nodesbyphase': nodesbyphase, | ||||
b'bookmarks': remotebookmarks, | ||||
b'manifestnodes': manifestnodes, | ||||
Gregory Szorc
|
r39667 | } | ||
Gregory Szorc
|
r39674 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39674 | def _fetchmanifests(repo, tr, remote, manifestnodes): | ||
rootmanifest = repo.manifestlog.getstorage(b'') | ||||
# Some manifests can be shared between changesets. Filter out revisions | ||||
# we already know about. | ||||
fetchnodes = [] | ||||
linkrevs = {} | ||||
seen = set() | ||||
Gregory Szorc
|
r43376 | for clrev, node in sorted(pycompat.iteritems(manifestnodes)): | ||
Gregory Szorc
|
r39674 | if node in seen: | ||
continue | ||||
try: | ||||
rootmanifest.rev(node) | ||||
except error.LookupError: | ||||
fetchnodes.append(node) | ||||
linkrevs[node] = clrev | ||||
seen.add(node) | ||||
# TODO handle tree manifests | ||||
# addgroup() expects 7-tuple describing revisions. This normalizes | ||||
# the wire data to that format. | ||||
def iterrevisions(objs, progress): | ||||
for manifest in objs: | ||||
node = manifest[b'node'] | ||||
Gregory Szorc
|
r39839 | extrafields = {} | ||
for field, size in manifest.get(b'fieldsfollowing', []): | ||||
extrafields[field] = next(objs) | ||||
if b'delta' in extrafields: | ||||
Gregory Szorc
|
r39674 | basenode = manifest[b'deltabasenode'] | ||
Gregory Szorc
|
r39839 | delta = extrafields[b'delta'] | ||
elif b'revision' in extrafields: | ||||
Gregory Szorc
|
r39674 | basenode = nullid | ||
Gregory Szorc
|
r39839 | revision = extrafields[b'revision'] | ||
Gregory Szorc
|
r39674 | delta = mdiff.trivialdiffheader(len(revision)) + revision | ||
else: | ||||
continue | ||||
yield ( | ||||
node, | ||||
manifest[b'parents'][0], | ||||
manifest[b'parents'][1], | ||||
# The value passed in is passed to the lookup function passed | ||||
# to addgroup(). We already have a map of manifest node to | ||||
# changelog revision number. So we just pass in the | ||||
# manifest node here and use linkrevs.__getitem__ as the | ||||
# resolution function. | ||||
node, | ||||
basenode, | ||||
delta, | ||||
# Flags not yet supported. | ||||
Augie Fackler
|
r43346 | 0, | ||
Raphaël Gomès
|
r47445 | # Sidedata not yet supported. | ||
{}, | ||||
Gregory Szorc
|
r39674 | ) | ||
progress.increment() | ||||
Augie Fackler
|
r43346 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39674 | |||
Gregory Szorc
|
r40209 | commandmeta = remote.apidescriptor[b'commands'][b'manifestdata'] | ||
batchsize = commandmeta.get(b'recommendedbatchsize', 10000) | ||||
Gregory Szorc
|
r39674 | # TODO make size configurable on client? | ||
# We send commands 1 at a time to the remote. This is not the most | ||||
# efficient because we incur a round trip at the end of each batch. | ||||
# However, the existing frame-based reactor keeps consuming server | ||||
# data in the background. And this results in response data buffering | ||||
# in memory. This can consume gigabytes of memory. | ||||
# TODO send multiple commands in a request once background buffering | ||||
# issues are resolved. | ||||
added = [] | ||||
for i in pycompat.xrange(0, len(fetchnodes), batchsize): | ||||
Augie Fackler
|
r43346 | batch = [node for node in fetchnodes[i : i + batchsize]] | ||
Gregory Szorc
|
r39674 | if not batch: | ||
continue | ||||
with remote.commandexecutor() as e: | ||||
Augie Fackler
|
r43346 | objs = e.callcommand( | ||
b'manifestdata', | ||||
{ | ||||
b'tree': b'', | ||||
b'nodes': batch, | ||||
b'fields': {b'parents', b'revision'}, | ||||
b'haveparents': True, | ||||
}, | ||||
).result() | ||||
Gregory Szorc
|
r39674 | |||
# Chomp off header object. | ||||
next(objs) | ||||
Joerg Sonnenberger
|
r47259 | def onchangeset(cl, rev): | ||
added.append(cl.node(rev)) | ||||
Joerg Sonnenberger
|
r46373 | |||
rootmanifest.addgroup( | ||||
iterrevisions(objs, progress), | ||||
linkrevs.__getitem__, | ||||
weakref.proxy(tr), | ||||
addrevisioncb=onchangeset, | ||||
duplicaterevisioncb=onchangeset, | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39674 | |||
progress.complete() | ||||
return { | ||||
Augie Fackler
|
r43347 | b'added': added, | ||
b'linkrevs': linkrevs, | ||||
Gregory Szorc
|
r39674 | } | ||
Gregory Szorc
|
r39676 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r40363 | def _derivefilesfrommanifests(repo, matcher, manifestnodes): | ||
Gregory Szorc
|
r39676 | """Determine what file nodes are relevant given a set of manifest nodes. | ||
Returns a dict mapping file paths to dicts of file node to first manifest | ||||
node. | ||||
""" | ||||
ml = repo.manifestlog | ||||
fnodes = collections.defaultdict(dict) | ||||
Gregory Szorc
|
r40071 | progress = repo.ui.makeprogress( | ||
Augie Fackler
|
r43347 | _(b'scanning manifests'), total=len(manifestnodes) | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40071 | |||
with progress: | ||||
for manifestnode in manifestnodes: | ||||
m = ml.get(b'', manifestnode) | ||||
Gregory Szorc
|
r39676 | |||
Gregory Szorc
|
r40071 | # TODO this will pull in unwanted nodes because it takes the storage | ||
# delta into consideration. What we really want is something that | ||||
# takes the delta between the manifest's parents. And ideally we | ||||
# would ignore file nodes that are known locally. For now, ignore | ||||
# both these limitations. This will result in incremental fetches | ||||
# requesting data we already have. So this is far from ideal. | ||||
md = m.readfast() | ||||
Gregory Szorc
|
r39676 | |||
Gregory Szorc
|
r40071 | for path, fnode in md.items(): | ||
Gregory Szorc
|
r40363 | if matcher(path): | ||
fnodes[path].setdefault(fnode, manifestnode) | ||||
Gregory Szorc
|
r40071 | |||
progress.increment() | ||||
Gregory Szorc
|
r39676 | |||
return fnodes | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39676 | def _fetchfiles(repo, tr, remote, fnodes, linkrevs): | ||
Gregory Szorc
|
r40215 | """Fetch file data from explicit file revisions.""" | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r39676 | def iterrevisions(objs, progress): | ||
for filerevision in objs: | ||||
node = filerevision[b'node'] | ||||
Gregory Szorc
|
r39839 | extrafields = {} | ||
for field, size in filerevision.get(b'fieldsfollowing', []): | ||||
extrafields[field] = next(objs) | ||||
if b'delta' in extrafields: | ||||
Gregory Szorc
|
r39676 | basenode = filerevision[b'deltabasenode'] | ||
Gregory Szorc
|
r39839 | delta = extrafields[b'delta'] | ||
elif b'revision' in extrafields: | ||||
Gregory Szorc
|
r39676 | basenode = nullid | ||
Gregory Szorc
|
r39839 | revision = extrafields[b'revision'] | ||
Gregory Szorc
|
r39676 | delta = mdiff.trivialdiffheader(len(revision)) + revision | ||
else: | ||||
continue | ||||
yield ( | ||||
node, | ||||
filerevision[b'parents'][0], | ||||
filerevision[b'parents'][1], | ||||
node, | ||||
basenode, | ||||
delta, | ||||
# Flags not yet supported. | ||||
0, | ||||
Raphaël Gomès
|
r47445 | # Sidedata not yet supported. | ||
{}, | ||||
Gregory Szorc
|
r39676 | ) | ||
progress.increment() | ||||
progress = repo.ui.makeprogress( | ||||
Augie Fackler
|
r43347 | _(b'files'), | ||
unit=_(b'chunks'), | ||||
Gregory Szorc
|
r43374 | total=sum(len(v) for v in pycompat.itervalues(fnodes)), | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r39676 | |||
# TODO make batch size configurable | ||||
batchsize = 10000 | ||||
fnodeslist = [x for x in sorted(fnodes.items())] | ||||
for i in pycompat.xrange(0, len(fnodeslist), batchsize): | ||||
Augie Fackler
|
r43346 | batch = [x for x in fnodeslist[i : i + batchsize]] | ||
Gregory Szorc
|
r39676 | if not batch: | ||
continue | ||||
with remote.commandexecutor() as e: | ||||
fs = [] | ||||
locallinkrevs = {} | ||||
for path, nodes in batch: | ||||
Augie Fackler
|
r43346 | fs.append( | ||
( | ||||
path, | ||||
e.callcommand( | ||||
b'filedata', | ||||
{ | ||||
b'path': path, | ||||
b'nodes': sorted(nodes), | ||||
b'fields': {b'parents', b'revision'}, | ||||
b'haveparents': True, | ||||
}, | ||||
), | ||||
) | ||||
) | ||||
Gregory Szorc
|
r39676 | |||
locallinkrevs[path] = { | ||||
node: linkrevs[manifestnode] | ||||
Gregory Szorc
|
r43376 | for node, manifestnode in pycompat.iteritems(nodes) | ||
Augie Fackler
|
r43346 | } | ||
Gregory Szorc
|
r39676 | |||
for path, f in fs: | ||||
objs = f.result() | ||||
# Chomp off header objects. | ||||
next(objs) | ||||
store = repo.file(path) | ||||
store.addgroup( | ||||
iterrevisions(objs, progress), | ||||
locallinkrevs[path].__getitem__, | ||||
Augie Fackler
|
r43346 | weakref.proxy(tr), | ||
) | ||||
Gregory Szorc
|
r40215 | |||
Augie Fackler
|
r43346 | def _fetchfilesfromcsets( | ||
repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False | ||||
): | ||||
Gregory Szorc
|
r40215 | """Fetch file data from explicit changeset revisions.""" | ||
def iterrevisions(objs, remaining, progress): | ||||
while remaining: | ||||
filerevision = next(objs) | ||||
node = filerevision[b'node'] | ||||
extrafields = {} | ||||
for field, size in filerevision.get(b'fieldsfollowing', []): | ||||
extrafields[field] = next(objs) | ||||
if b'delta' in extrafields: | ||||
basenode = filerevision[b'deltabasenode'] | ||||
delta = extrafields[b'delta'] | ||||
elif b'revision' in extrafields: | ||||
basenode = nullid | ||||
revision = extrafields[b'revision'] | ||||
delta = mdiff.trivialdiffheader(len(revision)) + revision | ||||
else: | ||||
continue | ||||
Gregory Szorc
|
r40429 | if b'linknode' in filerevision: | ||
linknode = filerevision[b'linknode'] | ||||
else: | ||||
linknode = node | ||||
Gregory Szorc
|
r40215 | yield ( | ||
node, | ||||
filerevision[b'parents'][0], | ||||
filerevision[b'parents'][1], | ||||
Gregory Szorc
|
r40429 | linknode, | ||
Gregory Szorc
|
r40215 | basenode, | ||
delta, | ||||
# Flags not yet supported. | ||||
0, | ||||
Raphaël Gomès
|
r47445 | # Sidedata not yet supported. | ||
{}, | ||||
Gregory Szorc
|
r40215 | ) | ||
progress.increment() | ||||
remaining -= 1 | ||||
progress = repo.ui.makeprogress( | ||||
Augie Fackler
|
r43347 | _(b'files'), | ||
unit=_(b'chunks'), | ||||
Gregory Szorc
|
r43374 | total=sum(len(v) for v in pycompat.itervalues(fnodes)), | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r40215 | |||
commandmeta = remote.apidescriptor[b'commands'][b'filesdata'] | ||||
batchsize = commandmeta.get(b'recommendedbatchsize', 50000) | ||||
Gregory Szorc
|
r40429 | shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features | ||
fields = {b'parents', b'revision'} | ||||
clrev = repo.changelog.rev | ||||
# There are no guarantees that we'll have ancestor revisions if | ||||
# a) this repo has shallow file storage b) shallow data fetching is enabled. | ||||
# Force remote to not delta against possibly unknown revisions when these | ||||
# conditions hold. | ||||
haveparents = not (shallowfiles or shallow) | ||||
# Similarly, we may not have calculated linkrevs for all incoming file | ||||
# revisions. Ask the remote to do work for us in this case. | ||||
if not haveparents: | ||||
fields.add(b'linknode') | ||||
Gregory Szorc
|
r40215 | for i in pycompat.xrange(0, len(csets), batchsize): | ||
Augie Fackler
|
r43346 | batch = [x for x in csets[i : i + batchsize]] | ||
Gregory Szorc
|
r40215 | if not batch: | ||
continue | ||||
with remote.commandexecutor() as e: | ||||
args = { | ||||
Augie Fackler
|
r43346 | b'revisions': [ | ||
Augie Fackler
|
r46554 | { | ||
b'type': b'changesetexplicit', | ||||
b'nodes': batch, | ||||
} | ||||
Augie Fackler
|
r43346 | ], | ||
Gregory Szorc
|
r40429 | b'fields': fields, | ||
b'haveparents': haveparents, | ||||
Gregory Szorc
|
r40215 | } | ||
Gregory Szorc
|
r40363 | if pathfilter: | ||
args[b'pathfilter'] = pathfilter | ||||
Gregory Szorc
|
r40215 | objs = e.callcommand(b'filesdata', args).result() | ||
# First object is an overall header. | ||||
overall = next(objs) | ||||
# We have overall['totalpaths'] segments. | ||||
for i in pycompat.xrange(overall[b'totalpaths']): | ||||
header = next(objs) | ||||
path = header[b'path'] | ||||
store = repo.file(path) | ||||
linkrevs = { | ||||
fnode: manlinkrevs[mnode] | ||||
Gregory Szorc
|
r43376 | for fnode, mnode in pycompat.iteritems(fnodes[path]) | ||
Augie Fackler
|
r43346 | } | ||
Gregory Szorc
|
r40215 | |||
Gregory Szorc
|
r40429 | def getlinkrev(node): | ||
if node in linkrevs: | ||||
return linkrevs[node] | ||||
else: | ||||
return clrev(node) | ||||
Augie Fackler
|
r43346 | store.addgroup( | ||
iterrevisions(objs, header[b'totalitems'], progress), | ||||
getlinkrev, | ||||
weakref.proxy(tr), | ||||
maybemissingparents=shallow, | ||||
) | ||||