##// END OF EJS Templates
revlog: rewrite censoring logic...
revlog: rewrite censoring logic I was able to corrupt a revlog relatively easily with the existing censoring code. The underlying problem is that the existing code doesn't fully take delta chains into account. When copying revisions that occur after the censored revision, the delta base can refer to a censored revision. Then at read time, things blow up due to the revision data not being a compressed delta. This commit rewrites the revlog censoring code to take a higher-level approach. We now create a new revlog instance pointing at temp files. We iterate through each revision in the source revlog and insert those revisions into the new revlog, replacing the censored revision's data along the way. The new implementation isn't as efficient as the old one. This is because it will fully engage delta computation on insertion. But I don't think it matters. The new implementation is a bit hacky because it attempts to reload the revlog instance with a new revlog index/data file. This is fragile. But this is needed because the index (which could be backed by C) would have a cached copy of the old, possibly changed data and that could lead to problems accessing index or revision data later. One benefit of the new approach is that we integrate with the transaction. The old revlog is backed up and if the transaction is rolled back, the original revlog is restored. As part of this, we had to teach the transaction about the store vfs. I'm not super keen about this. But this was the easiest way to hook things up to the transaction. We /could/ just ignore the transaction like we were doing before. But any file mutation should be governed by transaction semantics, including undo during rollback. Differential Revision: https://phab.mercurial-scm.org/D4869

File last commit:

r40071:7a347d36 default
r40092:324b4b10 default
Show More
exchangev2.py
417 lines | 13.5 KiB | text/x-python | PythonLexer
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 # exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
Gregory Szorc
exchangev2: fetch file revisions...
r39676 import collections
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 import weakref
from .i18n import _
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 from .node import (
nullid,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 short,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 )
from . import (
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 bookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 error,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 mdiff,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 phases,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pycompat,
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 setdiscovery,
)
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 tr = pullop.trmanager.transaction()
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665
# Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # And fetch the data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 pullheads = pullop.heads or remoteheads
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
if csetres['added']:
phases.registernew(repo, tr, phases.draft, csetres['added'])
# And adjust the phase of all changesets accordingly.
for phase in phases.phasenames:
if phase == b'secret' or not csetres['nodesbyphase'][phase]:
continue
phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
csetres['nodesbyphase'][phase])
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 # Write bookmark updates.
bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
remote.url(), pullop.gettransaction,
explicit=pullop.explicitbookmarks)
Gregory Szorc
exchangev2: fetch file revisions...
r39676 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
# Find all file nodes referenced by added manifests and fetch those
# revisions.
fnodes = _derivefilesfrommanifests(repo, manres['added'])
_fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
Gregory Szorc
exchangev2: start to implement pull with wire protocol v2...
r39665 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
nodemap = repo.unfiltered().changelog.nodemap
common |= {head for head in remoteheads if head in nodemap}
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
objs = e.callcommand(b'changesetdata', {
b'noderange': [sorted(common), sorted(remoteheads)],
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }).result()
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
def _processchangesetdata(repo, tr, objs):
repo.hook('prechangegroup', throw=True,
**pycompat.strkwargs(tr.hookargs))
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
progress = repo.ui.makeprogress(_('changesets'),
unit=_('chunks'),
total=meta.get(b'totalitems'))
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 manifestnodes = {}
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def linkrev(node):
repo.ui.debug('add changeset %s\n' % short(node))
# Linkrev for changelog is always self.
return len(cl)
def onchangeset(cl, node):
progress.increment()
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 revision = cl.changelogrevision(node)
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[cl.rev(node)] = revision.manifest
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 nodesbyphase = {phase: set() for phase in phases.phasenames}
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 remotebookmarks = {}
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 # addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 #
# This loop also aggregates non-revision metadata, such as phase
# data.
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 def iterrevisions():
for cset in objs:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
Gregory Szorc
wireprotov2: add TODOs around extending changesetdata fields...
r39672 # TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in cset.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 # Some entries might only be metadata only updates.
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 if b'revision' not in extrafields:
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 continue
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 data = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667
yield (
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 node,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
)
added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
addrevisioncb=onchangeset)
progress.complete()
return {
'added': added,
Gregory Szorc
exchangev2: fetch and apply phases data...
r39669 'nodesbyphase': nodesbyphase,
Gregory Szorc
exchangev2: fetch and apply bookmarks...
r39671 'bookmarks': remotebookmarks,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 'manifestnodes': manifestnodes,
Gregory Szorc
exchangev2: fetch changeset revisions...
r39667 }
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674
def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
for clrev, node in sorted(manifestnodes.iteritems()):
if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in manifest.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = manifest[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
0
)
progress.increment()
progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
total=len(fetchnodes))
# Fetch manifests 10,000 per command.
# TODO have server advertise preferences?
# TODO make size configurable on client?
batchsize = 10000
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
batch = [node for node in fetchnodes[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
objs = e.callcommand(b'manifestdata', {
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
Gregory Szorc
wireprotov2: let clients drive delta behavior...
r39677 b'haveparents': True,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }).result()
# Chomp off header object.
next(objs)
added.extend(rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr)))
progress.complete()
return {
'added': added,
Gregory Szorc
exchangev2: fetch file revisions...
r39676 'linkrevs': linkrevs,
Gregory Szorc
exchangev2: fetch manifest revisions...
r39674 }
Gregory Szorc
exchangev2: fetch file revisions...
r39676
def _derivefilesfrommanifests(repo, manifestnodes):
"""Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 progress = repo.ui.makeprogress(
_('scanning manifests'), total=len(manifestnodes))
with progress:
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 # TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that
# takes the delta between the manifest's parents. And ideally we
# would ignore file nodes that are known locally. For now, ignore
# both these limitations. This will result in incremental fetches
# requesting data we already have. So this is far from ideal.
md = m.readfast()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
Gregory Szorc
exchangev2: add progress bar around manifest scanning...
r40071 for path, fnode in md.items():
fnodes[path].setdefault(fnode, manifestnode)
progress.increment()
Gregory Szorc
exchangev2: fetch file revisions...
r39676
return fnodes
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = filerevision[b'deltabasenode']
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 delta = extrafields[b'delta']
elif b'revision' in extrafields:
Gregory Szorc
exchangev2: fetch file revisions...
r39676 basenode = nullid
Gregory Szorc
wireprotov2: allow multiple fields to follow revision maps...
r39839 revision = extrafields[b'revision']
Gregory Szorc
exchangev2: fetch file revisions...
r39676 delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
)
progress.increment()
progress = repo.ui.makeprogress(
_('files'), unit=_('chunks'),
total=sum(len(v) for v in fnodes.itervalues()))
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
batch = [x for x in fnodeslist[i:i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
fs.append((path, e.callcommand(b'filedata', {
b'path': path,
b'nodes': sorted(nodes),
Gregory Szorc
wireprotov2: let clients drive delta behavior...
r39677 b'fields': {b'parents', b'revision'},
b'haveparents': True,
Gregory Szorc
exchangev2: fetch file revisions...
r39676 })))
locallinkrevs[path] = {
node: linkrevs[manifestnode]
for node, manifestnode in nodes.iteritems()}
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
weakref.proxy(tr))