##// END OF EJS Templates
localrepo: iteratively derive local repository type...
localrepo: iteratively derive local repository type This commit implements the dynamic local repository type derivation that was explained in the recent commit bfeab472e3c0 "localrepo: create new function for instantiating a local repo object." Instead of a static localrepository class/type which must be customized after construction, we now dynamically construct a type by building up base classes/types to represent specific repository interfaces. Conceptually, the end state is similar to what was happening when various extensions would monkeypatch the __class__ of newly-constructed repo instances. However, the approach is inverted. Instead of making the instance then customizing it, we do the customization up front by influencing the behavior of the type then we instantiate that custom type. This approach gives us much more flexibility. For example, we can use completely separate classes for implementing different aspects of the repository. For example, we could have one class representing revlog-based file storage and another representing non-revlog based file storage. When then choose which implementation to use based on the presence of repo requirements. A concern with this approach is that it creates a lot more types and complexity and that complexity adds overhead. Yes, it is true that this approach will result in more types being created. Yes, this is more complicated than traditional "instantiate a static type." However, I believe the alternatives to supporting alternate storage backends are just as complicated. (Before I arrived at this solution, I had patches storing factory functions on local repo instances for e.g. constructing a file storage instance. We ended up having a handful of these. And this was logically identical to assigning custom methods. Since we were logically changing the type of the instance, I figured it would be better to just use specialized types instead of introducing levels of abstraction at run-time.) On the performance front, I don't believe that having N base classes has any significant performance overhead compared to just a single base class. Intuition says that Python will need to iterate the base classes to find an attribute. However, CPython caches method lookups: as long as the __class__ or MRO isn't changing, method attribute lookup should be constant time after first access. And non-method attributes are stored in __dict__, of which there is only 1 per object, so the number of base classes for __dict__ is irrelevant. Anyway, this commit splits up the monolithic completelocalrepository interface into sub-interfaces: 1 for file storage and 1 representing everything else. We've taught ``makelocalrepository()`` to call a series of factory functions which will produce types implementing specific interfaces. It then calls type() to create a new type from the built-up list of base types. This commit should be considered a start and not the end state. I suspect we'll hit a number of problems as we start to implement alternate storage backends: * Passing custom arguments to __init__ and setting custom attributes on __dict__. * Customizing the set of interfaces that are needed. e.g. the "readonly" intent could translate to not requesting an interface providing methods related to writing. * More ergonomic way for extensions to insert themselves so their callbacks aren't unconditionally called. * Wanting to modify vfs instances, other arguments passed to __init__. That being said, this code is usable in its current state and I'm convinced future commits will demonstrate the value in this approach. Differential Revision: https://phab.mercurial-scm.org/D4642

File last commit:

r39677:aa7e3123 default
r39800:e4e88157 default
Show More
exchangev2.py
396 lines | 12.8 KiB | text/x-python | PythonLexer
# exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import collections
import weakref
from .i18n import _
from .node import (
nullid,
short,
)
from . import (
bookmarks,
error,
mdiff,
phases,
pycompat,
setdiscovery,
)
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
tr = pullop.trmanager.transaction()
# Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
# And fetch the data.
pullheads = pullop.heads or remoteheads
csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
if csetres['added']:
phases.registernew(repo, tr, phases.draft, csetres['added'])
# And adjust the phase of all changesets accordingly.
for phase in phases.phasenames:
if phase == b'secret' or not csetres['nodesbyphase'][phase]:
continue
phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
csetres['nodesbyphase'][phase])
# Write bookmark updates.
bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
remote.url(), pullop.gettransaction,
explicit=pullop.explicitbookmarks)
manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
# Find all file nodes referenced by added manifests and fetch those
# revisions.
fnodes = _derivefilesfrommanifests(repo, manres['added'])
_fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
nodemap = repo.unfiltered().changelog.nodemap
common |= {head for head in remoteheads if head in nodemap}
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
objs = e.callcommand(b'changesetdata', {
b'noderange': [sorted(common), sorted(remoteheads)],
b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
}).result()
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
def _processchangesetdata(repo, tr, objs):
repo.hook('prechangegroup', throw=True,
**pycompat.strkwargs(tr.hookargs))
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
progress = repo.ui.makeprogress(_('changesets'),
unit=_('chunks'),
total=meta.get(b'totalitems'))
manifestnodes = {}
def linkrev(node):
repo.ui.debug('add changeset %s\n' % short(node))
# Linkrev for changelog is always self.
return len(cl)
def onchangeset(cl, node):
progress.increment()
revision = cl.changelogrevision(node)
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[cl.rev(node)] = revision.manifest
nodesbyphase = {phase: set() for phase in phases.phasenames}
remotebookmarks = {}
# addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
#
# This loop also aggregates non-revision metadata, such as phase
# data.
def iterrevisions():
for cset in objs:
node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
# TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
# Some entries might only be metadata only updates.
if b'revisionsize' not in cset:
continue
data = next(objs)
yield (
node,
cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
)
added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
addrevisioncb=onchangeset)
progress.complete()
return {
'added': added,
'nodesbyphase': nodesbyphase,
'bookmarks': remotebookmarks,
'manifestnodes': manifestnodes,
}
def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
for clrev, node in sorted(manifestnodes.iteritems()):
if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
if b'deltasize' in manifest:
basenode = manifest[b'deltabasenode']
delta = next(objs)
elif b'revisionsize' in manifest:
basenode = nullid
revision = next(objs)
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
0
)
progress.increment()
progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
total=len(fetchnodes))
# Fetch manifests 10,000 per command.
# TODO have server advertise preferences?
# TODO make size configurable on client?
batchsize = 10000
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
batch = [node for node in fetchnodes[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
objs = e.callcommand(b'manifestdata', {
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
b'haveparents': True,
}).result()
# Chomp off header object.
next(objs)
added.extend(rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr)))
progress.complete()
return {
'added': added,
'linkrevs': linkrevs,
}
def _derivefilesfrommanifests(repo, manifestnodes):
"""Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
# TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that takes
# the delta between the manifest's parents. And ideally we would
# ignore file nodes that are known locally. For now, ignore both
# these limitations. This will result in incremental fetches requesting
# data we already have. So this is far from ideal.
md = m.readfast()
for path, fnode in md.items():
fnodes[path].setdefault(fnode, manifestnode)
return fnodes
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
if b'deltasize' in filerevision:
basenode = filerevision[b'deltabasenode']
delta = next(objs)
elif b'revisionsize' in filerevision:
basenode = nullid
revision = next(objs)
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
progress = repo.ui.makeprogress(
_('files'), unit=_('chunks'),
total=sum(len(v) for v in fnodes.itervalues()))
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
batch = [x for x in fnodeslist[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
fs.append((path, e.callcommand(b'filedata', {
b'path': path,
b'nodes': sorted(nodes),
b'fields': {b'parents', b'revision'},
b'haveparents': True,
})))
locallinkrevs[path] = {
node: linkrevs[manifestnode]
for node, manifestnode in nodes.iteritems()}
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
weakref.proxy(tr))