##// END OF EJS Templates
copies: introduce the hg-cpython wrapper for `combine_changeset_copies`...
copies: introduce the hg-cpython wrapper for `combine_changeset_copies` This patch focus on the `hg-cpython` part of this work. Bridging the python code with the new rust code in `hg-core`. The next patch will actually plug this in the python code. The rust code use multiple Python callback, python related error within this callback are not expected unless they are a programming error or a data corruption. In addition, these callback will slowly be replaced by native Rust code. For these reasons, we use will deal with unexpected error within this callback using rust Panic and let the `rust-cpython` layer deal with raising a Python exception. The code dealing with the ChangedFile instance is repeating itself a lot. I did not factor these duplication out because that whole code will get replaced by entirely different one in a handful of changesets. Differential Revision: https://phab.mercurial-scm.org/D9298

File last commit:

r46554:89a2afe3 default
r46557:50c5ee3b default
Show More
exchangev2.py
795 lines | 24.5 KiB | text/x-python | PythonLexer
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
Gregory Szorc
exchangev2: fetch file revisions...
r39676 import collections
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 import weakref
from .i18n import _
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 from .node import (
nullid,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 short,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 )
from . import (
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 bookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 error,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 mdiff,
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 narrowspec,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 phases,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pycompat,
Pulkit Goyal
interfaces: create a new folder for interfaces and move repository.py in it...
r43078 setdiscovery,
)
Augie Fackler
formatting: blacken the codebase...
r43346 from .interfaces import repository
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
# If this is a clone and it was requested to perform a "stream clone",
# we obtain the raw files data from the remote then fall back to an
# incremental pull. This is somewhat hacky and is not nearly robust enough
# for long-term usage.
if usingrawchangelogandmanifest:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 with repo.transaction(b'clone'):
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 _fetchrawstorefiles(repo, remote)
repo.invalidate(clearfilecache=True)
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 tr = pullop.trmanager.transaction()
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 # We don't use the repo's narrow matcher here because the patterns passed
# to exchange.pull() could be different.
Augie Fackler
formatting: blacken the codebase...
r43346 narrowmatcher = narrowspec.match(
repo.root,
# Empty maps to nevermatcher. So always
# set includes if missing.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 pullop.includepats or {b'path:.'},
Augie Fackler
formatting: blacken the codebase...
r43346 pullop.excludepats,
)
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363
if pullop.includepats or pullop.excludepats:
pathfilter = {}
if pullop.includepats:
pathfilter[b'include'] = sorted(pullop.includepats)
if pullop.excludepats:
pathfilter[b'exclude'] = sorted(pullop.excludepats)
else:
pathfilter = None
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
Augie Fackler
formatting: blacken the codebase...
r43346 repo, remote, pullop.heads, abortwhenunrelated=pullop.force
)
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # And fetch the data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pullheads = pullop.heads or remoteheads
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if csetres[b'added']:
Joerg Sonnenberger
phases: convert registernew users to use revision sets...
r46375 phases.registernew(
repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
# And adjust the phase of all changesets accordingly.
Joerg Sonnenberger
phases: sparsify phase lists...
r45677 for phasenumber, phase in phases.phasenames.items():
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Augie Fackler
formatting: blacken the codebase...
r43346 phases.advanceboundary(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 repo,
tr,
phasenumber,
csetres[b'nodesbyphase'][phase],
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 # Write bookmark updates.
Augie Fackler
formatting: blacken the codebase...
r43346 bookmarks.updatefromremote(
repo.ui,
repo,
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 csetres[b'bookmarks'],
Augie Fackler
formatting: blacken the codebase...
r43346 remote.url(),
pullop.gettransaction,
explicit=pullop.explicitbookmarks,
)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 # We don't properly support shallow changeset and manifest yet. So we apply
# depth limiting locally.
if pullop.depth:
relevantcsetnodes = set()
clnode = repo.changelog.node
Augie Fackler
formatting: blacken the codebase...
r43346 for rev in repo.revs(
b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
):
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 relevantcsetnodes.add(clnode(rev))
csetrelevantfilter = lambda n: n in relevantcsetnodes
else:
csetrelevantfilter = lambda n: True
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 # If obtaining the raw store files, we need to scan the full repo to
# derive all the changesets, manifests, and linkrevs.
if usingrawchangelogandmanifest:
csetsforfiles = []
mnodesforfiles = []
manifestlinkrevs = {}
for rev in repo:
ctx = repo[rev]
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 node = ctx.node()
if not csetrelevantfilter(node):
continue
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnode = ctx.manifestnode()
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 csetsforfiles.append(node)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 mnodesforfiles.append(mnode)
manifestlinkrevs[mnode] = rev
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
mnodesforfiles = manres[b'added']
manifestlinkrevs = manres[b'linkrevs']
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
Gregory Szorc
exchangev2: fetch file revisions...
r39676 # Find all file nodes referenced by added manifests and fetch those
# revisions.
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
Augie Fackler
formatting: blacken the codebase...
r43346 _fetchfilesfromcsets(
repo,
tr,
remote,
pathfilter,
fnodes,
csetsforfiles,
manifestlinkrevs,
shallow=bool(pullop.depth),
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
def _checkuserawstorefiledata(pullop):
"""Check whether we should use rawstorefiledata command to retrieve data."""
repo = pullop.repo
remote = pullop.remote
# Command to obtain raw store data isn't available.
if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
return False
# Only honor if user requested stream clone operation.
if not pullop.streamclonerequested:
return False
# Only works on empty repos.
if len(repo):
return False
# TODO This is super hacky. There needs to be a storage API for this. We
# also need to check for compatibility with the remote.
if b'revlogv1' not in repo.requirements:
return False
return True
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 def _fetchrawstorefiles(repo, remote):
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 b'rawstorefiledata',
{
b'files': [b'changelog', b'manifestlog'],
},
Augie Fackler
formatting: blacken the codebase...
r43346 ).result()
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
# First object is a summary of files data that follows.
overall = next(objs)
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366 with progress:
progress.update(0)
# Next are pairs of file metadata, data.
while True:
try:
filemeta = next(objs)
except StopIteration:
break
for k in (b'location', b'path', b'size'):
if k not in filemeta:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(b'remote file data missing key: %s') % k
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
if filemeta[b'location'] == b'store':
vfs = repo.svfs
else:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 _(b'invalid location for raw file data: %s')
Augie Fackler
formatting: blacken the codebase...
r43346 % filemeta[b'location']
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
bytesremaining = filemeta[b'size']
with vfs.open(filemeta[b'path'], b'wb') as fh:
while True:
try:
chunk = next(objs)
except StopIteration:
break
bytesremaining -= len(chunk)
if bytesremaining < 0:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'received invalid number of bytes for file '
b'data; expected %d, got extra'
)
% filemeta[b'size']
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
progress.increment(step=len(chunk))
fh.write(chunk)
try:
if chunk.islast:
break
except AttributeError:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'did not receive indefinite length bytestring '
b'for file data'
)
)
Gregory Szorc
exchangev2: support for calling rawstorefiledata to retrieve raw files...
r40366
if bytesremaining:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.Abort(
_(
b'received invalid number of bytes for'
b'file data; expected %d got %d'
)
% (
filemeta[b'size'],
filemeta[b'size'] - bytesremaining,
)
)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
Augie Fackler
formatting: blacken the codebase...
r43346 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
)
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
index: use `index.has_node` in `exchangev2._pullchangesetdiscovery`...
r43946 has_node = repo.unfiltered().changelog.index.has_node
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
index: use `index.has_node` in `exchangev2._pullchangesetdiscovery`...
r43946 common |= {head for head in remoteheads if has_node(head)}
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
b'changesetdata',
{
b'revisions': [
{
b'type': b'changesetdagrange',
b'roots': sorted(common),
b'heads': sorted(remoteheads),
}
],
b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
},
).result()
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def _processchangesetdata(repo, tr, objs):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 manifestnodes = {}
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 added = []
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def linkrev(node):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 repo.ui.debug(b'add changeset %s\n' % short(node))
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # Linkrev for changelog is always self.
return len(cl)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 def ondupchangeset(cl, node):
added.append(node)
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def onchangeset(cl, node):
progress.increment()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 revision = cl.changelogrevision(node)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 added.append(node)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[cl.rev(node)] = revision.manifest
Joerg Sonnenberger
phases: sparsify phase lists...
r45677 nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 remotebookmarks = {}
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 #
# This loop also aggregates non-revision metadata, such as phase
# data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def iterrevisions():
for cset in objs:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
Gregory Szorc
wireprotov2: add TODOs around extending changesetdata fields...
r39672 # TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in cset.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # Some entries might only be metadata only updates.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 if b'revision' not in extrafields:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 data = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
yield (
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 cl.addgroup(
iterrevisions(),
linkrev,
weakref.proxy(tr),
addrevisioncb=onchangeset,
duplicaterevisioncb=ondupchangeset,
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
progress.complete()
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'added': added,
b'nodesbyphase': nodesbyphase,
b'bookmarks': remotebookmarks,
b'manifestnodes': manifestnodes,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in manifest.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = manifest[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
Augie Fackler
formatting: blacken the codebase...
r43346 0,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 )
progress.increment()
Augie Fackler
formatting: blacken the codebase...
r43346 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: honor server advertised manifestdata recommended batch size...
r40209 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 # TODO make size configurable on client?
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [node for node in fetchnodes[i : i + batchsize]]
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 if not batch:
continue
with remote.commandexecutor() as e:
Augie Fackler
formatting: blacken the codebase...
r43346 objs = e.callcommand(
b'manifestdata',
{
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
).result()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
# Chomp off header object.
next(objs)
Joerg Sonnenberger
revlog: extend addgroup() with callback for duplicates...
r46373 def onchangeset(cl, node):
added.append(node)
rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr),
addrevisioncb=onchangeset,
duplicaterevisioncb=onchangeset,
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
progress.complete()
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'added': added,
b'linkrevs': linkrevs,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
Gregory Szorc
exchangev2: fetch file revisions...
r39676 """Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'scanning manifests'), total=len(manifestnodes)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071
with progress:
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 # TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that
# takes the delta between the manifest's parents. And ideally we
# would ignore file nodes that are known locally. For now, ignore
# both these limitations. This will result in incremental fetches
# requesting data we already have. So this is far from ideal.
md = m.readfast()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 for path, fnode in md.items():
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if matcher(path):
fnodes[path].setdefault(fnode, manifestnode)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071
progress.increment()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
return fnodes
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch file revisions...
r39676 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit file revisions."""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
exchangev2: fetch file revisions...
r39676 def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = filerevision[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch file revisions...
r39676 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'files'),
unit=_(b'chunks'),
Gregory Szorc
py3: define and use pycompat.itervalues()...
r43374 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: fetch file revisions...
r39676
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [x for x in fnodeslist[i : i + batchsize]]
Gregory Szorc
exchangev2: fetch file revisions...
r39676 if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
Augie Fackler
formatting: blacken the codebase...
r43346 fs.append(
(
path,
e.callcommand(
b'filedata',
{
b'path': path,
b'nodes': sorted(nodes),
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
),
)
)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
locallinkrevs[path] = {
node: linkrevs[manifestnode]
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for node, manifestnode in pycompat.iteritems(nodes)
Augie Fackler
formatting: blacken the codebase...
r43346 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
Augie Fackler
formatting: blacken the codebase...
r43346 weakref.proxy(tr),
)
Gregory Szorc
exchangev2: use filesdata...
r40215
Augie Fackler
formatting: blacken the codebase...
r43346 def _fetchfilesfromcsets(
repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
):
Gregory Szorc
exchangev2: use filesdata...
r40215 """Fetch file data from explicit changeset revisions."""
def iterrevisions(objs, remaining, progress):
while remaining:
filerevision = next(objs)
node = filerevision[b'node']
extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = filerevision[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 if b'linknode' in filerevision:
linknode = filerevision[b'linknode']
else:
linknode = node
Gregory Szorc
exchangev2: use filesdata...
r40215 yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 linknode,
Gregory Szorc
exchangev2: use filesdata...
r40215 basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
remaining -= 1
progress = repo.ui.makeprogress(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'files'),
unit=_(b'chunks'),
Gregory Szorc
py3: define and use pycompat.itervalues()...
r43374 total=sum(len(v) for v in pycompat.itervalues(fnodes)),
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
exchangev2: use filesdata...
r40215
commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
fields = {b'parents', b'revision'}
clrev = repo.changelog.rev
# There are no guarantees that we'll have ancestor revisions if
# a) this repo has shallow file storage b) shallow data fetching is enabled.
# Force remote to not delta against possibly unknown revisions when these
# conditions hold.
haveparents = not (shallowfiles or shallow)
# Similarly, we may not have calculated linkrevs for all incoming file
# revisions. Ask the remote to do work for us in this case.
if not haveparents:
fields.add(b'linknode')
Gregory Szorc
exchangev2: use filesdata...
r40215 for i in pycompat.xrange(0, len(csets), batchsize):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = [x for x in csets[i : i + batchsize]]
Gregory Szorc
exchangev2: use filesdata...
r40215 if not batch:
continue
with remote.commandexecutor() as e:
args = {
Augie Fackler
formatting: blacken the codebase...
r43346 b'revisions': [
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 {
b'type': b'changesetexplicit',
b'nodes': batch,
}
Augie Fackler
formatting: blacken the codebase...
r43346 ],
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 b'fields': fields,
b'haveparents': haveparents,
Gregory Szorc
exchangev2: use filesdata...
r40215 }
Gregory Szorc
exchangev2: recognize narrow patterns when pulling...
r40363 if pathfilter:
args[b'pathfilter'] = pathfilter
Gregory Szorc
exchangev2: use filesdata...
r40215 objs = e.callcommand(b'filesdata', args).result()
# First object is an overall header.
overall = next(objs)
# We have overall['totalpaths'] segments.
for i in pycompat.xrange(overall[b'totalpaths']):
header = next(objs)
path = header[b'path']
store = repo.file(path)
linkrevs = {
fnode: manlinkrevs[mnode]
Gregory Szorc
py3: finish porting iteritems() to pycompat and remove source transformer...
r43376 for fnode, mnode in pycompat.iteritems(fnodes[path])
Augie Fackler
formatting: blacken the codebase...
r43346 }
Gregory Szorc
exchangev2: use filesdata...
r40215
Gregory Szorc
exchangev2: support fetching shallow files history...
r40429 def getlinkrev(node):
if node in linkrevs:
return linkrevs[node]
else:
return clrev(node)
Augie Fackler
formatting: blacken the codebase...
r43346 store.addgroup(
iterrevisions(objs, header[b'totalitems'], progress),
getlinkrev,
weakref.proxy(tr),
maybemissingparents=shallow,
)