##// END OF EJS Templates
procutil: avoid using os.fork() to implement runbgcommand...
procutil: avoid using os.fork() to implement runbgcommand We ran into the following deadlock: - some command creates an ssh peer, then raises without explicitly closing the peer (hg id + extension in our case) - dispatch catches the exception, calls ui.log('commandfinish', ..) (the sshpeer is still not closed), which calls logtoprocess, which calls procutil.runbgcommand. - in the child of runbgcommand's fork(), between the fork and the exec, the opening of file descriptors triggers a gc which runs the destructor for sshpeer, which waits on ssh's stderr being closed, which never happens since ssh's stderr is held open by the parent of the fork where said destructor hasn't run Remotefilelog appears to have a hack around this deadlock as well. I don't know if there's more subtlety to it, because even though the problem is determistic, it is very fragile, so I didn't manage to reduce it. I can imagine three ways of tackling this problem: 1. don't run any python between fork and exec in runbgcommand 2. make the finalizer harmless after the fork 3. close the peer without relying on gc behavior This commit goes with 1, as forking without exec'ing is tricky in general in a language with gc finalizers. And maybe it's better in the presence of rust threads. A future commit will try 2 or 3. Performance wise: at low memory usage, it's an improvement. At higher memory usage, it's about 2x faster than before when ensurestart=True, but 2x slower when ensurestart=False. Not sure if that matters. The reason for that last bit is that the subprocess.Popen always waits for the execve to finish, and at high memory usage, execve is slow because it deallocates the large page table. Numbers and script: before after mem=1.0GB, ensurestart=True 52.1ms 26.0ms mem=1.0GB, ensurestart=False 14.7ms 26.0ms mem=0.5GB, ensurestart=True 23.2ms 11.2ms mem=0.5GB, ensurestart=False 6.2ms 11.3ms mem=0.2GB, ensurestart=True 15.7ms 7.4ms mem=0.2GB, ensurestart=False 4.3ms 8.1ms mem=0.0GB, ensurestart=True 2.3ms 0.7ms mem=0.0GB, ensurestart=False 0.8ms 0.8ms import time for memsize in [1_000_000_000, 500_000_000, 250_000_000, 0]: mem = 'a' * memsize for ensurestart in [True, False]: now = time.time() n = 100 for i in range(n): procutil.runbgcommand([b'true'], {}, ensurestart=ensurestart) after = time.time() ms = (after - now) / float(n) * 1000 print(f'mem={memsize / 1e9:.1f}GB, ensurestart={ensurestart} -> {ms:.1f}ms') Differential Revision: https://phab.mercurial-scm.org/D9019

File last commit:

r47445:a41565be default
r47651:8759e22f default
Show More
exchangev2.py
807 lines | 24.9 KiB | text/x-python | PythonLexer
# exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import collections
import weakref
from .i18n import _
from .node import (
nullid,
short,
)
from . import (
bookmarks,
error,
mdiff,
narrowspec,
phases,
pycompat,
requirements as requirementsmod,
setdiscovery,
)
from .interfaces import repository
def pull(pullop):
"""Pull using wire protocol version 2."""
repo = pullop.repo
remote = pullop.remote
usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
# If this is a clone and it was requested to perform a "stream clone",
# we obtain the raw files data from the remote then fall back to an
# incremental pull. This is somewhat hacky and is not nearly robust enough
# for long-term usage.
if usingrawchangelogandmanifest:
with repo.transaction(b'clone'):
_fetchrawstorefiles(repo, remote)
repo.invalidate(clearfilecache=True)
tr = pullop.trmanager.transaction()
# We don't use the repo's narrow matcher here because the patterns passed
# to exchange.pull() could be different.
narrowmatcher = narrowspec.match(
repo.root,
# Empty maps to nevermatcher. So always
# set includes if missing.
pullop.includepats or {b'path:.'},
pullop.excludepats,
)
if pullop.includepats or pullop.excludepats:
pathfilter = {}
if pullop.includepats:
pathfilter[b'include'] = sorted(pullop.includepats)
if pullop.excludepats:
pathfilter[b'exclude'] = sorted(pullop.excludepats)
else:
pathfilter = None
# Figure out what needs to be fetched.
common, fetch, remoteheads = _pullchangesetdiscovery(
repo, remote, pullop.heads, abortwhenunrelated=pullop.force
)
# And fetch the data.
pullheads = pullop.heads or remoteheads
csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
# New revisions are written to the changelog. But all other updates
# are deferred. Do those now.
# Ensure all new changesets are draft by default. If the repo is
# publishing, the phase will be adjusted by the loop below.
if csetres[b'added']:
phases.registernew(
repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
)
# And adjust the phase of all changesets accordingly.
for phasenumber, phase in phases.phasenames.items():
if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
continue
phases.advanceboundary(
repo,
tr,
phasenumber,
csetres[b'nodesbyphase'][phase],
)
# Write bookmark updates.
bookmarks.updatefromremote(
repo.ui,
repo,
csetres[b'bookmarks'],
remote.url(),
pullop.gettransaction,
explicit=pullop.explicitbookmarks,
)
manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
# We don't properly support shallow changeset and manifest yet. So we apply
# depth limiting locally.
if pullop.depth:
relevantcsetnodes = set()
clnode = repo.changelog.node
for rev in repo.revs(
b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
):
relevantcsetnodes.add(clnode(rev))
csetrelevantfilter = lambda n: n in relevantcsetnodes
else:
csetrelevantfilter = lambda n: True
# If obtaining the raw store files, we need to scan the full repo to
# derive all the changesets, manifests, and linkrevs.
if usingrawchangelogandmanifest:
csetsforfiles = []
mnodesforfiles = []
manifestlinkrevs = {}
for rev in repo:
ctx = repo[rev]
node = ctx.node()
if not csetrelevantfilter(node):
continue
mnode = ctx.manifestnode()
csetsforfiles.append(node)
mnodesforfiles.append(mnode)
manifestlinkrevs[mnode] = rev
else:
csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
mnodesforfiles = manres[b'added']
manifestlinkrevs = manres[b'linkrevs']
# Find all file nodes referenced by added manifests and fetch those
# revisions.
fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
_fetchfilesfromcsets(
repo,
tr,
remote,
pathfilter,
fnodes,
csetsforfiles,
manifestlinkrevs,
shallow=bool(pullop.depth),
)
def _checkuserawstorefiledata(pullop):
"""Check whether we should use rawstorefiledata command to retrieve data."""
repo = pullop.repo
remote = pullop.remote
# Command to obtain raw store data isn't available.
if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
return False
# Only honor if user requested stream clone operation.
if not pullop.streamclonerequested:
return False
# Only works on empty repos.
if len(repo):
return False
# TODO This is super hacky. There needs to be a storage API for this. We
# also need to check for compatibility with the remote.
if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements:
return False
return True
def _fetchrawstorefiles(repo, remote):
with remote.commandexecutor() as e:
objs = e.callcommand(
b'rawstorefiledata',
{
b'files': [b'changelog', b'manifestlog'],
},
).result()
# First object is a summary of files data that follows.
overall = next(objs)
progress = repo.ui.makeprogress(
_(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
)
with progress:
progress.update(0)
# Next are pairs of file metadata, data.
while True:
try:
filemeta = next(objs)
except StopIteration:
break
for k in (b'location', b'path', b'size'):
if k not in filemeta:
raise error.Abort(
_(b'remote file data missing key: %s') % k
)
if filemeta[b'location'] == b'store':
vfs = repo.svfs
else:
raise error.Abort(
_(b'invalid location for raw file data: %s')
% filemeta[b'location']
)
bytesremaining = filemeta[b'size']
with vfs.open(filemeta[b'path'], b'wb') as fh:
while True:
try:
chunk = next(objs)
except StopIteration:
break
bytesremaining -= len(chunk)
if bytesremaining < 0:
raise error.Abort(
_(
b'received invalid number of bytes for file '
b'data; expected %d, got extra'
)
% filemeta[b'size']
)
progress.increment(step=len(chunk))
fh.write(chunk)
try:
if chunk.islast:
break
except AttributeError:
raise error.Abort(
_(
b'did not receive indefinite length bytestring '
b'for file data'
)
)
if bytesremaining:
raise error.Abort(
_(
b'received invalid number of bytes for'
b'file data; expected %d got %d'
)
% (
filemeta[b'size'],
filemeta[b'size'] - bytesremaining,
)
)
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
if heads:
knownnode = repo.changelog.hasnode
if all(knownnode(head) for head in heads):
return heads, False, heads
# TODO wire protocol version 2 is capable of more efficient discovery
# than setdiscovery. Consider implementing something better.
common, fetch, remoteheads = setdiscovery.findcommonheads(
repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
)
common = set(common)
remoteheads = set(remoteheads)
# If a remote head is filtered locally, put it back in the common set.
# See the comment in exchange._pulldiscoverychangegroup() for more.
if fetch and remoteheads:
has_node = repo.unfiltered().changelog.index.has_node
common |= {head for head in remoteheads if has_node(head)}
if set(remoteheads).issubset(common):
fetch = []
common.discard(nullid)
return common, fetch, remoteheads
def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
# TODO consider adding a step here where we obtain the DAG shape first
# (or ask the server to slice changesets into chunks for us) so that
# we can perform multiple fetches in batches. This will facilitate
# resuming interrupted clones, higher server-side cache hit rates due
# to smaller segments, etc.
with remote.commandexecutor() as e:
objs = e.callcommand(
b'changesetdata',
{
b'revisions': [
{
b'type': b'changesetdagrange',
b'roots': sorted(common),
b'heads': sorted(remoteheads),
}
],
b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
},
).result()
# The context manager waits on all response data when exiting. So
# we need to remain in the context manager in order to stream data.
return _processchangesetdata(repo, tr, objs)
def _processchangesetdata(repo, tr, objs):
repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
urepo = repo.unfiltered()
cl = urepo.changelog
cl.delayupdate(tr)
# The first emitted object is a header describing the data that
# follows.
meta = next(objs)
progress = repo.ui.makeprogress(
_(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
)
manifestnodes = {}
added = []
def linkrev(node):
repo.ui.debug(b'add changeset %s\n' % short(node))
# Linkrev for changelog is always self.
return len(cl)
def ondupchangeset(cl, rev):
added.append(cl.node(rev))
def onchangeset(cl, rev):
progress.increment()
revision = cl.changelogrevision(rev)
added.append(cl.node(rev))
# We need to preserve the mapping of changelog revision to node
# so we can set the linkrev accordingly when manifests are added.
manifestnodes[rev] = revision.manifest
repo.register_changeset(rev, revision)
nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
remotebookmarks = {}
# addgroup() expects a 7-tuple describing revisions. This normalizes
# the wire data to that format.
#
# This loop also aggregates non-revision metadata, such as phase
# data.
def iterrevisions():
for cset in objs:
node = cset[b'node']
if b'phase' in cset:
nodesbyphase[cset[b'phase']].add(node)
for mark in cset.get(b'bookmarks', []):
remotebookmarks[mark] = node
# TODO add mechanism for extensions to examine records so they
# can siphon off custom data fields.
extrafields = {}
for field, size in cset.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
# Some entries might only be metadata only updates.
if b'revision' not in extrafields:
continue
data = extrafields[b'revision']
yield (
node,
cset[b'parents'][0],
cset[b'parents'][1],
# Linknode is always itself for changesets.
cset[b'node'],
# We always send full revisions. So delta base is not set.
nullid,
mdiff.trivialdiffheader(len(data)) + data,
# Flags not yet supported.
0,
# Sidedata not yet supported
{},
)
cl.addgroup(
iterrevisions(),
linkrev,
weakref.proxy(tr),
alwayscache=True,
addrevisioncb=onchangeset,
duplicaterevisioncb=ondupchangeset,
)
progress.complete()
return {
b'added': added,
b'nodesbyphase': nodesbyphase,
b'bookmarks': remotebookmarks,
b'manifestnodes': manifestnodes,
}
def _fetchmanifests(repo, tr, remote, manifestnodes):
rootmanifest = repo.manifestlog.getstorage(b'')
# Some manifests can be shared between changesets. Filter out revisions
# we already know about.
fetchnodes = []
linkrevs = {}
seen = set()
for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
if node in seen:
continue
try:
rootmanifest.rev(node)
except error.LookupError:
fetchnodes.append(node)
linkrevs[node] = clrev
seen.add(node)
# TODO handle tree manifests
# addgroup() expects 7-tuple describing revisions. This normalizes
# the wire data to that format.
def iterrevisions(objs, progress):
for manifest in objs:
node = manifest[b'node']
extrafields = {}
for field, size in manifest.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = manifest[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
manifest[b'parents'][0],
manifest[b'parents'][1],
# The value passed in is passed to the lookup function passed
# to addgroup(). We already have a map of manifest node to
# changelog revision number. So we just pass in the
# manifest node here and use linkrevs.__getitem__ as the
# resolution function.
node,
basenode,
delta,
# Flags not yet supported.
0,
# Sidedata not yet supported.
{},
)
progress.increment()
progress = repo.ui.makeprogress(
_(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
)
commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
# TODO make size configurable on client?
# We send commands 1 at a time to the remote. This is not the most
# efficient because we incur a round trip at the end of each batch.
# However, the existing frame-based reactor keeps consuming server
# data in the background. And this results in response data buffering
# in memory. This can consume gigabytes of memory.
# TODO send multiple commands in a request once background buffering
# issues are resolved.
added = []
for i in pycompat.xrange(0, len(fetchnodes), batchsize):
batch = [node for node in fetchnodes[i : i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
objs = e.callcommand(
b'manifestdata',
{
b'tree': b'',
b'nodes': batch,
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
).result()
# Chomp off header object.
next(objs)
def onchangeset(cl, rev):
added.append(cl.node(rev))
rootmanifest.addgroup(
iterrevisions(objs, progress),
linkrevs.__getitem__,
weakref.proxy(tr),
addrevisioncb=onchangeset,
duplicaterevisioncb=onchangeset,
)
progress.complete()
return {
b'added': added,
b'linkrevs': linkrevs,
}
def _derivefilesfrommanifests(repo, matcher, manifestnodes):
"""Determine what file nodes are relevant given a set of manifest nodes.
Returns a dict mapping file paths to dicts of file node to first manifest
node.
"""
ml = repo.manifestlog
fnodes = collections.defaultdict(dict)
progress = repo.ui.makeprogress(
_(b'scanning manifests'), total=len(manifestnodes)
)
with progress:
for manifestnode in manifestnodes:
m = ml.get(b'', manifestnode)
# TODO this will pull in unwanted nodes because it takes the storage
# delta into consideration. What we really want is something that
# takes the delta between the manifest's parents. And ideally we
# would ignore file nodes that are known locally. For now, ignore
# both these limitations. This will result in incremental fetches
# requesting data we already have. So this is far from ideal.
md = m.readfast()
for path, fnode in md.items():
if matcher(path):
fnodes[path].setdefault(fnode, manifestnode)
progress.increment()
return fnodes
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
"""Fetch file data from explicit file revisions."""
def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = filerevision[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
node,
basenode,
delta,
# Flags not yet supported.
0,
# Sidedata not yet supported.
{},
)
progress.increment()
progress = repo.ui.makeprogress(
_(b'files'),
unit=_(b'chunks'),
total=sum(len(v) for v in pycompat.itervalues(fnodes)),
)
# TODO make batch size configurable
batchsize = 10000
fnodeslist = [x for x in sorted(fnodes.items())]
for i in pycompat.xrange(0, len(fnodeslist), batchsize):
batch = [x for x in fnodeslist[i : i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
fs = []
locallinkrevs = {}
for path, nodes in batch:
fs.append(
(
path,
e.callcommand(
b'filedata',
{
b'path': path,
b'nodes': sorted(nodes),
b'fields': {b'parents', b'revision'},
b'haveparents': True,
},
),
)
)
locallinkrevs[path] = {
node: linkrevs[manifestnode]
for node, manifestnode in pycompat.iteritems(nodes)
}
for path, f in fs:
objs = f.result()
# Chomp off header objects.
next(objs)
store = repo.file(path)
store.addgroup(
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
weakref.proxy(tr),
)
def _fetchfilesfromcsets(
repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
):
"""Fetch file data from explicit changeset revisions."""
def iterrevisions(objs, remaining, progress):
while remaining:
filerevision = next(objs)
node = filerevision[b'node']
extrafields = {}
for field, size in filerevision.get(b'fieldsfollowing', []):
extrafields[field] = next(objs)
if b'delta' in extrafields:
basenode = filerevision[b'deltabasenode']
delta = extrafields[b'delta']
elif b'revision' in extrafields:
basenode = nullid
revision = extrafields[b'revision']
delta = mdiff.trivialdiffheader(len(revision)) + revision
else:
continue
if b'linknode' in filerevision:
linknode = filerevision[b'linknode']
else:
linknode = node
yield (
node,
filerevision[b'parents'][0],
filerevision[b'parents'][1],
linknode,
basenode,
delta,
# Flags not yet supported.
0,
# Sidedata not yet supported.
{},
)
progress.increment()
remaining -= 1
progress = repo.ui.makeprogress(
_(b'files'),
unit=_(b'chunks'),
total=sum(len(v) for v in pycompat.itervalues(fnodes)),
)
commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
fields = {b'parents', b'revision'}
clrev = repo.changelog.rev
# There are no guarantees that we'll have ancestor revisions if
# a) this repo has shallow file storage b) shallow data fetching is enabled.
# Force remote to not delta against possibly unknown revisions when these
# conditions hold.
haveparents = not (shallowfiles or shallow)
# Similarly, we may not have calculated linkrevs for all incoming file
# revisions. Ask the remote to do work for us in this case.
if not haveparents:
fields.add(b'linknode')
for i in pycompat.xrange(0, len(csets), batchsize):
batch = [x for x in csets[i : i + batchsize]]
if not batch:
continue
with remote.commandexecutor() as e:
args = {
b'revisions': [
{
b'type': b'changesetexplicit',
b'nodes': batch,
}
],
b'fields': fields,
b'haveparents': haveparents,
}
if pathfilter:
args[b'pathfilter'] = pathfilter
objs = e.callcommand(b'filesdata', args).result()
# First object is an overall header.
overall = next(objs)
# We have overall['totalpaths'] segments.
for i in pycompat.xrange(overall[b'totalpaths']):
header = next(objs)
path = header[b'path']
store = repo.file(path)
linkrevs = {
fnode: manlinkrevs[mnode]
for fnode, mnode in pycompat.iteritems(fnodes[path])
}
def getlinkrev(node):
if node in linkrevs:
return linkrevs[node]
else:
return clrev(node)
store.addgroup(
iterrevisions(objs, header[b'totalitems'], progress),
getlinkrev,
weakref.proxy(tr),
maybemissingparents=shallow,
)