wireprotov2server.py
921 lines
| 29.9 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotov2server.py
Gregory Szorc
|
r37563 | # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net> | ||
# Copyright 2005-2007 Matt Mackall <mpm@selenic.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import contextlib | ||||
from .i18n import _ | ||||
Gregory Szorc
|
r39666 | from .node import ( | ||
Gregory Szorc
|
r39675 | hex, | ||
Gregory Szorc
|
r39666 | nullid, | ||
Gregory Szorc
|
r39673 | nullrev, | ||
Gregory Szorc
|
r39666 | ) | ||
Gregory Szorc
|
r37563 | from . import ( | ||
Gregory Szorc
|
r39673 | changegroup, | ||
dagop, | ||||
Gregory Szorc
|
r39666 | discovery, | ||
Gregory Szorc
|
r37564 | encoding, | ||
Gregory Szorc
|
r37563 | error, | ||
pycompat, | ||||
Gregory Szorc
|
r37675 | streamclone, | ||
Gregory Szorc
|
r37564 | util, | ||
Gregory Szorc
|
r37563 | wireprotoframing, | ||
wireprototypes, | ||||
) | ||||
Gregory Szorc
|
r37828 | from .utils import ( | ||
interfaceutil, | ||||
) | ||||
Gregory Szorc
|
r37563 | |||
Gregory Szorc
|
r37743 | FRAMINGTYPE = b'application/mercurial-exp-framing-0005' | ||
Gregory Szorc
|
r37563 | |||
Gregory Szorc
|
r37662 | HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2 | ||
Gregory Szorc
|
r37563 | |||
Gregory Szorc
|
r37802 | COMMANDS = wireprototypes.commanddict() | ||
Gregory Szorc
|
r37563 | def handlehttpv2request(rctx, req, res, checkperm, urlparts): | ||
from .hgweb import common as hgwebcommon | ||||
# URL space looks like: <permissions>/<command>, where <permission> can | ||||
# be ``ro`` or ``rw`` to signal read-only or read-write, respectively. | ||||
# Root URL does nothing meaningful... yet. | ||||
if not urlparts: | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('HTTP version 2 API handler')) | ||||
return | ||||
if len(urlparts) == 1: | ||||
res.status = b'404 Not Found' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('do not know how to process %s\n') % | ||||
req.dispatchpath) | ||||
return | ||||
permission, command = urlparts[0:2] | ||||
if permission not in (b'ro', b'rw'): | ||||
res.status = b'404 Not Found' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('unknown permission: %s') % permission) | ||||
return | ||||
if req.method != 'POST': | ||||
res.status = b'405 Method Not Allowed' | ||||
res.headers[b'Allow'] = b'POST' | ||||
res.setbodybytes(_('commands require POST requests')) | ||||
return | ||||
# At some point we'll want to use our own API instead of recycling the | ||||
# behavior of version 1 of the wire protocol... | ||||
# TODO return reasonable responses - not responses that overload the | ||||
# HTTP status line message for error reporting. | ||||
try: | ||||
checkperm(rctx, req, 'pull' if permission == b'ro' else 'push') | ||||
except hgwebcommon.ErrorResponse as e: | ||||
res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e)) | ||||
for k, v in e.headers: | ||||
res.headers[k] = v | ||||
res.setbodybytes('permission denied') | ||||
return | ||||
# We have a special endpoint to reflect the request back at the client. | ||||
if command == b'debugreflect': | ||||
_processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res) | ||||
return | ||||
# Extra commands that we handle that aren't really wire protocol | ||||
# commands. Think extra hard before making this hackery available to | ||||
# extension. | ||||
extracommands = {'multirequest'} | ||||
Gregory Szorc
|
r37802 | if command not in COMMANDS and command not in extracommands: | ||
Gregory Szorc
|
r37563 | res.status = b'404 Not Found' | ||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('unknown wire protocol command: %s\n') % command) | ||||
return | ||||
repo = rctx.repo | ||||
ui = repo.ui | ||||
proto = httpv2protocolhandler(req, ui) | ||||
Gregory Szorc
|
r37802 | if (not COMMANDS.commandavailable(command, proto) | ||
Gregory Szorc
|
r37563 | and command not in extracommands): | ||
res.status = b'404 Not Found' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('invalid wire protocol command: %s') % command) | ||||
return | ||||
# TODO consider cases where proxies may add additional Accept headers. | ||||
if req.headers.get(b'Accept') != FRAMINGTYPE: | ||||
res.status = b'406 Not Acceptable' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('client MUST specify Accept header with value: %s\n') | ||||
% FRAMINGTYPE) | ||||
return | ||||
if req.headers.get(b'Content-Type') != FRAMINGTYPE: | ||||
res.status = b'415 Unsupported Media Type' | ||||
# TODO we should send a response with appropriate media type, | ||||
# since client does Accept it. | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('client MUST send Content-Type header with ' | ||||
'value: %s\n') % FRAMINGTYPE) | ||||
return | ||||
_processhttpv2request(ui, repo, req, res, permission, command, proto) | ||||
def _processhttpv2reflectrequest(ui, repo, req, res): | ||||
"""Reads unified frame protocol request and dumps out state to client. | ||||
This special endpoint can be used to help debug the wire protocol. | ||||
Instead of routing the request through the normal dispatch mechanism, | ||||
we instead read all frames, decode them, and feed them into our state | ||||
tracker. We then dump the log of all that activity back out to the | ||||
client. | ||||
""" | ||||
import json | ||||
# Reflection APIs have a history of being abused, accidentally disclosing | ||||
# sensitive data, etc. So we have a config knob. | ||||
if not ui.configbool('experimental', 'web.api.debugreflect'): | ||||
res.status = b'404 Not Found' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('debugreflect service not available')) | ||||
return | ||||
# We assume we have a unified framing protocol request body. | ||||
reactor = wireprotoframing.serverreactor() | ||||
states = [] | ||||
while True: | ||||
frame = wireprotoframing.readframe(req.bodyfh) | ||||
if not frame: | ||||
states.append(b'received: <no frame>') | ||||
break | ||||
states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags, | ||||
frame.requestid, | ||||
frame.payload)) | ||||
action, meta = reactor.onframerecv(frame) | ||||
states.append(json.dumps((action, meta), sort_keys=True, | ||||
separators=(', ', ': '))) | ||||
action, meta = reactor.oninputeof() | ||||
meta['action'] = action | ||||
states.append(json.dumps(meta, sort_keys=True, separators=(', ',': '))) | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(b'\n'.join(states)) | ||||
def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto): | ||||
"""Post-validation handler for HTTPv2 requests. | ||||
Called when the HTTP request contains unified frame-based protocol | ||||
frames for evaluation. | ||||
""" | ||||
# TODO Some HTTP clients are full duplex and can receive data before | ||||
# the entire request is transmitted. Figure out a way to indicate support | ||||
# for that so we can opt into full duplex mode. | ||||
reactor = wireprotoframing.serverreactor(deferoutput=True) | ||||
seencommand = False | ||||
outstream = reactor.makeoutputstream() | ||||
while True: | ||||
frame = wireprotoframing.readframe(req.bodyfh) | ||||
if not frame: | ||||
break | ||||
action, meta = reactor.onframerecv(frame) | ||||
if action == 'wantframe': | ||||
# Need more data before we can do anything. | ||||
continue | ||||
elif action == 'runcommand': | ||||
sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, | ||||
reqcommand, reactor, outstream, | ||||
meta, issubsequent=seencommand) | ||||
if sentoutput: | ||||
return | ||||
seencommand = True | ||||
elif action == 'error': | ||||
# TODO define proper error mechanism. | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(meta['message'] + b'\n') | ||||
return | ||||
else: | ||||
raise error.ProgrammingError( | ||||
'unhandled action from frame processor: %s' % action) | ||||
action, meta = reactor.oninputeof() | ||||
if action == 'sendframes': | ||||
# We assume we haven't started sending the response yet. If we're | ||||
# wrong, the response type will raise an exception. | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = FRAMINGTYPE | ||||
res.setbodygen(meta['framegen']) | ||||
elif action == 'noop': | ||||
pass | ||||
else: | ||||
raise error.ProgrammingError('unhandled action from frame processor: %s' | ||||
% action) | ||||
def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, | ||||
outstream, command, issubsequent): | ||||
"""Dispatch a wire protocol command made from HTTPv2 requests. | ||||
The authenticated permission (``authedperm``) along with the original | ||||
command from the URL (``reqcommand``) are passed in. | ||||
""" | ||||
# We already validated that the session has permissions to perform the | ||||
# actions in ``authedperm``. In the unified frame protocol, the canonical | ||||
# command to run is expressed in a frame. However, the URL also requested | ||||
# to run a specific command. We need to be careful that the command we | ||||
# run doesn't have permissions requirements greater than what was granted | ||||
# by ``authedperm``. | ||||
# | ||||
# Our rule for this is we only allow one command per HTTP request and | ||||
# that command must match the command in the URL. However, we make | ||||
# an exception for the ``multirequest`` URL. This URL is allowed to | ||||
# execute multiple commands. We double check permissions of each command | ||||
# as it is invoked to ensure there is no privilege escalation. | ||||
# TODO consider allowing multiple commands to regular command URLs | ||||
# iff each command is the same. | ||||
proto = httpv2protocolhandler(req, ui, args=command['args']) | ||||
if reqcommand == b'multirequest': | ||||
Gregory Szorc
|
r37802 | if not COMMANDS.commandavailable(command['command'], proto): | ||
Gregory Szorc
|
r37563 | # TODO proper error mechanism | ||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('wire protocol command not available: %s') % | ||||
command['command']) | ||||
return True | ||||
# TODO don't use assert here, since it may be elided by -O. | ||||
assert authedperm in (b'ro', b'rw') | ||||
Gregory Szorc
|
r37802 | wirecommand = COMMANDS[command['command']] | ||
Gregory Szorc
|
r37563 | assert wirecommand.permission in ('push', 'pull') | ||
if authedperm == b'ro' and wirecommand.permission != 'pull': | ||||
# TODO proper error mechanism | ||||
res.status = b'403 Forbidden' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('insufficient permissions to execute ' | ||||
'command: %s') % command['command']) | ||||
return True | ||||
# TODO should we also call checkperm() here? Maybe not if we're going | ||||
# to overhaul that API. The granted scope from the URL check should | ||||
# be good enough. | ||||
else: | ||||
# Don't allow multiple commands outside of ``multirequest`` URL. | ||||
if issubsequent: | ||||
# TODO proper error mechanism | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('multiple commands cannot be issued to this ' | ||||
'URL')) | ||||
return True | ||||
if reqcommand != command['command']: | ||||
# TODO define proper error mechanism | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = b'text/plain' | ||||
res.setbodybytes(_('command in frame must match command in URL')) | ||||
return True | ||||
res.status = b'200 OK' | ||||
res.headers[b'Content-Type'] = FRAMINGTYPE | ||||
Gregory Szorc
|
r39595 | try: | ||
objs = dispatch(repo, proto, command['command']) | ||||
action, meta = reactor.oncommandresponsereadyobjects( | ||||
outstream, command['requestid'], objs) | ||||
except Exception as e: | ||||
Gregory Szorc
|
r37744 | action, meta = reactor.onservererror( | ||
Gregory Szorc
|
r39595 | outstream, command['requestid'], | ||
_('exception when invoking command: %s') % e) | ||||
Gregory Szorc
|
r37563 | |||
if action == 'sendframes': | ||||
res.setbodygen(meta['framegen']) | ||||
return True | ||||
elif action == 'noop': | ||||
return False | ||||
else: | ||||
raise error.ProgrammingError('unhandled event from reactor: %s' % | ||||
action) | ||||
Gregory Szorc
|
r37800 | def getdispatchrepo(repo, proto, command): | ||
return repo.filtered('served') | ||||
def dispatch(repo, proto, command): | ||||
repo = getdispatchrepo(repo, proto, command) | ||||
Gregory Szorc
|
r37802 | func, spec = COMMANDS[command] | ||
Gregory Szorc
|
r37800 | args = proto.getargs(spec) | ||
return func(repo, proto, **args) | ||||
Gregory Szorc
|
r37828 | @interfaceutil.implementer(wireprototypes.baseprotocolhandler) | ||
Gregory Szorc
|
r37563 | class httpv2protocolhandler(object): | ||
def __init__(self, req, ui, args=None): | ||||
self._req = req | ||||
self._ui = ui | ||||
self._args = args | ||||
@property | ||||
def name(self): | ||||
Gregory Szorc
|
r37662 | return HTTP_WIREPROTO_V2 | ||
Gregory Szorc
|
r37563 | |||
def getargs(self, args): | ||||
data = {} | ||||
for k, typ in args.items(): | ||||
if k == '*': | ||||
raise NotImplementedError('do not support * args') | ||||
elif k in self._args: | ||||
# TODO consider validating value types. | ||||
data[k] = self._args[k] | ||||
return data | ||||
def getprotocaps(self): | ||||
# Protocol capabilities are currently not implemented for HTTP V2. | ||||
return set() | ||||
def getpayload(self): | ||||
raise NotImplementedError | ||||
@contextlib.contextmanager | ||||
def mayberedirectstdio(self): | ||||
raise NotImplementedError | ||||
def client(self): | ||||
raise NotImplementedError | ||||
def addcapabilities(self, repo, caps): | ||||
return caps | ||||
def checkperm(self, perm): | ||||
raise NotImplementedError | ||||
Gregory Szorc
|
r37564 | |||
Gregory Szorc
|
r37575 | def httpv2apidescriptor(req, repo): | ||
proto = httpv2protocolhandler(req, repo.ui) | ||||
return _capabilitiesv2(repo, proto) | ||||
Gregory Szorc
|
r37564 | def _capabilitiesv2(repo, proto): | ||
"""Obtain the set of capabilities for version 2 transports. | ||||
These capabilities are distinct from the capabilities for version 1 | ||||
transports. | ||||
""" | ||||
compression = [] | ||||
Gregory Szorc
|
r37801 | for engine in wireprototypes.supportedcompengines(repo.ui, util.SERVERROLE): | ||
Gregory Szorc
|
r37564 | compression.append({ | ||
b'name': engine.wireprotosupport().name, | ||||
}) | ||||
caps = { | ||||
'commands': {}, | ||||
'compression': compression, | ||||
Gregory Szorc
|
r37671 | 'framingmediatypes': [FRAMINGTYPE], | ||
Gregory Szorc
|
r37564 | } | ||
Gregory Szorc
|
r39672 | # TODO expose available changesetdata fields. | ||
Gregory Szorc
|
r37802 | for command, entry in COMMANDS.items(): | ||
Gregory Szorc
|
r37564 | caps['commands'][command] = { | ||
'args': entry.args, | ||||
'permissions': [entry.permission], | ||||
} | ||||
Gregory Szorc
|
r37675 | if streamclone.allowservergeneration(repo): | ||
caps['rawrepoformats'] = sorted(repo.requirements & | ||||
repo.supportedformats) | ||||
Gregory Szorc
|
r37564 | return proto.addcapabilities(repo, caps) | ||
Gregory Szorc
|
r39677 | def builddeltarequests(store, nodes, haveparents): | ||
Gregory Szorc
|
r39673 | """Build a series of revision delta requests against a backend store. | ||
Returns a list of revision numbers in the order they should be sent | ||||
and a list of ``irevisiondeltarequest`` instances to be made against | ||||
the backend store. | ||||
""" | ||||
# We sort and send nodes in DAG order because this is optimal for | ||||
# storage emission. | ||||
# TODO we may want a better storage API here - one where we can throw | ||||
# a list of nodes and delta preconditions over a figurative wall and | ||||
# have the storage backend figure it out for us. | ||||
revs = dagop.linearize({store.rev(n) for n in nodes}, store.parentrevs) | ||||
requests = [] | ||||
Gregory Szorc
|
r39677 | seenrevs = set() | ||
Gregory Szorc
|
r39673 | |||
for rev in revs: | ||||
node = store.node(rev) | ||||
Gregory Szorc
|
r39677 | parentnodes = store.parents(node) | ||
parentrevs = [store.rev(n) for n in parentnodes] | ||||
deltabaserev = store.deltaparent(rev) | ||||
deltabasenode = store.node(deltabaserev) | ||||
Gregory Szorc
|
r39673 | |||
Gregory Szorc
|
r39677 | # The choice of whether to send a fulltext revision or a delta and | ||
# what delta to send is governed by a few factors. | ||||
Gregory Szorc
|
r39673 | # | ||
Gregory Szorc
|
r39677 | # To send a delta, we need to ensure the receiver is capable of | ||
# decoding it. And that requires the receiver to have the base | ||||
# revision the delta is against. | ||||
# | ||||
# We can only guarantee the receiver has the base revision if | ||||
# a) we've already sent the revision as part of this group | ||||
# b) the receiver has indicated they already have the revision. | ||||
# And the mechanism for "b" is the client indicating they have | ||||
# parent revisions. So this means we can only send the delta if | ||||
# it is sent before or it is against a delta and the receiver says | ||||
# they have a parent. | ||||
Gregory Szorc
|
r39673 | |||
Gregory Szorc
|
r39677 | # We can send storage delta if it is against a revision we've sent | ||
# in this group. | ||||
if deltabaserev != nullrev and deltabaserev in seenrevs: | ||||
basenode = deltabasenode | ||||
# We can send storage delta if it is against a parent revision and | ||||
# the receiver indicates they have the parents. | ||||
elif (deltabaserev != nullrev and deltabaserev in parentrevs | ||||
and haveparents): | ||||
basenode = deltabasenode | ||||
Gregory Szorc
|
r39673 | |||
Gregory Szorc
|
r39677 | # Otherwise the storage delta isn't appropriate. Fall back to | ||
# using another delta, if possible. | ||||
Gregory Szorc
|
r39673 | |||
Gregory Szorc
|
r39677 | # Use p1 if we've emitted it or receiver says they have it. | ||
elif parentrevs[0] != nullrev and ( | ||||
parentrevs[0] in seenrevs or haveparents): | ||||
basenode = parentnodes[0] | ||||
# Use p2 if we've emitted it or receiver says they have it. | ||||
elif parentrevs[1] != nullrev and ( | ||||
parentrevs[1] in seenrevs or haveparents): | ||||
basenode = parentnodes[1] | ||||
# Nothing appropriate to delta against. Send the full revision. | ||||
Gregory Szorc
|
r39673 | else: | ||
basenode = nullid | ||||
requests.append(changegroup.revisiondeltarequest( | ||||
node=node, | ||||
Gregory Szorc
|
r39677 | p1node=parentnodes[0], | ||
p2node=parentnodes[1], | ||||
Gregory Szorc
|
r39673 | # Receiver deals with linknode resolution. | ||
linknode=nullid, | ||||
basenode=basenode, | ||||
)) | ||||
Gregory Szorc
|
r39677 | seenrevs.add(rev) | ||
Gregory Szorc
|
r39673 | return revs, requests | ||
Gregory Szorc
|
r37798 | def wireprotocommand(name, args=None, permission='push'): | ||
"""Decorator to declare a wire protocol command. | ||||
``name`` is the name of the wire protocol command being provided. | ||||
``args`` is a dict of argument names to example values. | ||||
``permission`` defines the permission type needed to run this command. | ||||
Can be ``push`` or ``pull``. These roughly map to read-write and read-only, | ||||
respectively. Default is to assume command requires ``push`` permissions | ||||
because otherwise commands not declaring their permissions could modify | ||||
a repository that is supposed to be read-only. | ||||
Gregory Szorc
|
r39595 | |||
Wire protocol commands are generators of objects to be serialized and | ||||
sent to the client. | ||||
If a command raises an uncaught exception, this will be translated into | ||||
a command error. | ||||
Gregory Szorc
|
r37798 | """ | ||
transports = {k for k, v in wireprototypes.TRANSPORTS.items() | ||||
if v['version'] == 2} | ||||
if permission not in ('push', 'pull'): | ||||
raise error.ProgrammingError('invalid wire protocol permission; ' | ||||
'got %s; expected "push" or "pull"' % | ||||
permission) | ||||
if args is None: | ||||
args = {} | ||||
if not isinstance(args, dict): | ||||
raise error.ProgrammingError('arguments for version 2 commands ' | ||||
'must be declared as dicts') | ||||
Gregory Szorc
|
r37564 | def register(func): | ||
Gregory Szorc
|
r37802 | if name in COMMANDS: | ||
Gregory Szorc
|
r37798 | raise error.ProgrammingError('%s command already registered ' | ||
'for version 2' % name) | ||||
Gregory Szorc
|
r37802 | COMMANDS[name] = wireprototypes.commandentry( | ||
Gregory Szorc
|
r37798 | func, args=args, transports=transports, permission=permission) | ||
return func | ||||
Gregory Szorc
|
r37564 | |||
return register | ||||
@wireprotocommand('branchmap', permission='pull') | ||||
def branchmapv2(repo, proto): | ||||
Gregory Szorc
|
r39595 | yield {encoding.fromlocal(k): v | ||
for k, v in repo.branchmap().iteritems()} | ||||
Gregory Szorc
|
r37564 | |||
@wireprotocommand('capabilities', permission='pull') | ||||
def capabilitiesv2(repo, proto): | ||||
Gregory Szorc
|
r39595 | yield _capabilitiesv2(repo, proto) | ||
Gregory Szorc
|
r37564 | |||
Gregory Szorc
|
r39666 | @wireprotocommand('changesetdata', | ||
args={ | ||||
'noderange': [[b'0123456...'], [b'abcdef...']], | ||||
'nodes': [b'0123456...'], | ||||
'fields': {b'parents', b'revision'}, | ||||
}, | ||||
permission='pull') | ||||
def changesetdata(repo, proto, noderange=None, nodes=None, fields=None): | ||||
fields = fields or set() | ||||
Gregory Szorc
|
r39672 | # TODO look for unknown fields and abort when they can't be serviced. | ||
Gregory Szorc
|
r39666 | if noderange is None and nodes is None: | ||
raise error.WireprotoCommandError( | ||||
'noderange or nodes must be defined') | ||||
if noderange is not None: | ||||
if len(noderange) != 2: | ||||
raise error.WireprotoCommandError( | ||||
'noderange must consist of 2 elements') | ||||
if not noderange[1]: | ||||
raise error.WireprotoCommandError( | ||||
'heads in noderange request cannot be empty') | ||||
cl = repo.changelog | ||||
hasnode = cl.hasnode | ||||
seen = set() | ||||
outgoing = [] | ||||
if nodes is not None: | ||||
outgoing.extend(n for n in nodes if hasnode(n)) | ||||
seen |= set(outgoing) | ||||
if noderange is not None: | ||||
if noderange[0]: | ||||
common = [n for n in noderange[0] if hasnode(n)] | ||||
else: | ||||
common = [nullid] | ||||
for n in discovery.outgoing(repo, common, noderange[1]).missing: | ||||
if n not in seen: | ||||
outgoing.append(n) | ||||
# Don't need to add to seen here because this is the final | ||||
# source of nodes and there should be no duplicates in this | ||||
# list. | ||||
seen.clear() | ||||
Gregory Szorc
|
r39668 | publishing = repo.publishing() | ||
Gregory Szorc
|
r39666 | |||
if outgoing: | ||||
repo.hook('preoutgoing', throw=True, source='serve') | ||||
yield { | ||||
b'totalitems': len(outgoing), | ||||
} | ||||
Gregory Szorc
|
r39668 | # The phases of nodes already transferred to the client may have changed | ||
# since the client last requested data. We send phase-only records | ||||
# for these revisions, if requested. | ||||
if b'phase' in fields and noderange is not None: | ||||
# TODO skip nodes whose phase will be reflected by a node in the | ||||
# outgoing set. This is purely an optimization to reduce data | ||||
# size. | ||||
for node in noderange[0]: | ||||
yield { | ||||
b'node': node, | ||||
b'phase': b'public' if publishing else repo[node].phasestr() | ||||
} | ||||
Gregory Szorc
|
r39670 | nodebookmarks = {} | ||
for mark, node in repo._bookmarks.items(): | ||||
nodebookmarks.setdefault(node, set()).add(mark) | ||||
Gregory Szorc
|
r39666 | # It is already topologically sorted by revision number. | ||
for node in outgoing: | ||||
d = { | ||||
b'node': node, | ||||
} | ||||
if b'parents' in fields: | ||||
d[b'parents'] = cl.parents(node) | ||||
Gregory Szorc
|
r39668 | if b'phase' in fields: | ||
if publishing: | ||||
d[b'phase'] = b'public' | ||||
else: | ||||
ctx = repo[node] | ||||
d[b'phase'] = ctx.phasestr() | ||||
Gregory Szorc
|
r39670 | if b'bookmarks' in fields and node in nodebookmarks: | ||
d[b'bookmarks'] = sorted(nodebookmarks[node]) | ||||
del nodebookmarks[node] | ||||
Gregory Szorc
|
r39666 | revisiondata = None | ||
if b'revision' in fields: | ||||
revisiondata = cl.revision(node, raw=True) | ||||
d[b'revisionsize'] = len(revisiondata) | ||||
Gregory Szorc
|
r39672 | # TODO make it possible for extensions to wrap a function or register | ||
# a handler to service custom fields. | ||||
Gregory Szorc
|
r39666 | yield d | ||
if revisiondata is not None: | ||||
yield revisiondata | ||||
Gregory Szorc
|
r39670 | # If requested, send bookmarks from nodes that didn't have revision | ||
# data sent so receiver is aware of any bookmark updates. | ||||
if b'bookmarks' in fields: | ||||
for node, marks in sorted(nodebookmarks.iteritems()): | ||||
yield { | ||||
b'node': node, | ||||
b'bookmarks': sorted(marks), | ||||
} | ||||
Gregory Szorc
|
r39675 | class FileAccessError(Exception): | ||
"""Represents an error accessing a specific file.""" | ||||
def __init__(self, path, msg, args): | ||||
self.path = path | ||||
self.msg = msg | ||||
self.args = args | ||||
def getfilestore(repo, proto, path): | ||||
"""Obtain a file storage object for use with wire protocol. | ||||
Exists as a standalone function so extensions can monkeypatch to add | ||||
access control. | ||||
""" | ||||
# This seems to work even if the file doesn't exist. So catch | ||||
# "empty" files and return an error. | ||||
fl = repo.file(path) | ||||
if not len(fl): | ||||
raise FileAccessError(path, 'unknown file: %s', (path,)) | ||||
return fl | ||||
@wireprotocommand('filedata', | ||||
args={ | ||||
Gregory Szorc
|
r39677 | 'haveparents': True, | ||
Gregory Szorc
|
r39675 | 'nodes': [b'0123456...'], | ||
'fields': [b'parents', b'revision'], | ||||
'path': b'foo.txt', | ||||
}, | ||||
permission='pull') | ||||
Gregory Szorc
|
r39677 | def filedata(repo, proto, haveparents=False, nodes=None, fields=None, | ||
path=None): | ||||
Gregory Szorc
|
r39675 | fields = fields or set() | ||
if nodes is None: | ||||
raise error.WireprotoCommandError('nodes argument must be defined') | ||||
if path is None: | ||||
raise error.WireprotoCommandError('path argument must be defined') | ||||
try: | ||||
# Extensions may wish to access the protocol handler. | ||||
store = getfilestore(repo, proto, path) | ||||
except FileAccessError as e: | ||||
raise error.WireprotoCommandError(e.msg, e.args) | ||||
# Validate requested nodes. | ||||
for node in nodes: | ||||
try: | ||||
store.rev(node) | ||||
except error.LookupError: | ||||
raise error.WireprotoCommandError('unknown file node: %s', | ||||
(hex(node),)) | ||||
Gregory Szorc
|
r39677 | revs, requests = builddeltarequests(store, nodes, haveparents) | ||
Gregory Szorc
|
r39675 | |||
yield { | ||||
b'totalitems': len(revs), | ||||
} | ||||
if b'revision' in fields: | ||||
deltas = store.emitrevisiondeltas(requests) | ||||
else: | ||||
deltas = None | ||||
for rev in revs: | ||||
node = store.node(rev) | ||||
if deltas is not None: | ||||
delta = next(deltas) | ||||
else: | ||||
delta = None | ||||
d = { | ||||
b'node': node, | ||||
} | ||||
if b'parents' in fields: | ||||
d[b'parents'] = store.parents(node) | ||||
if b'revision' in fields: | ||||
assert delta is not None | ||||
assert delta.flags == 0 | ||||
assert d[b'node'] == delta.node | ||||
if delta.revision is not None: | ||||
revisiondata = delta.revision | ||||
d[b'revisionsize'] = len(revisiondata) | ||||
else: | ||||
d[b'deltabasenode'] = delta.basenode | ||||
revisiondata = delta.delta | ||||
d[b'deltasize'] = len(revisiondata) | ||||
else: | ||||
revisiondata = None | ||||
yield d | ||||
if revisiondata is not None: | ||||
yield revisiondata | ||||
if deltas is not None: | ||||
try: | ||||
next(deltas) | ||||
raise error.ProgrammingError('should not have more deltas') | ||||
except GeneratorExit: | ||||
pass | ||||
Gregory Szorc
|
r37564 | @wireprotocommand('heads', | ||
args={ | ||||
'publiconly': False, | ||||
}, | ||||
permission='pull') | ||||
def headsv2(repo, proto, publiconly=False): | ||||
if publiconly: | ||||
repo = repo.filtered('immutable') | ||||
Gregory Szorc
|
r39595 | yield repo.heads() | ||
Gregory Szorc
|
r37564 | |||
@wireprotocommand('known', | ||||
args={ | ||||
'nodes': [b'deadbeef'], | ||||
}, | ||||
permission='pull') | ||||
def knownv2(repo, proto, nodes=None): | ||||
nodes = nodes or [] | ||||
result = b''.join(b'1' if n else b'0' for n in repo.known(nodes)) | ||||
Gregory Szorc
|
r39595 | yield result | ||
Gregory Szorc
|
r37564 | |||
@wireprotocommand('listkeys', | ||||
args={ | ||||
'namespace': b'ns', | ||||
}, | ||||
permission='pull') | ||||
def listkeysv2(repo, proto, namespace=None): | ||||
keys = repo.listkeys(encoding.tolocal(namespace)) | ||||
keys = {encoding.fromlocal(k): encoding.fromlocal(v) | ||||
for k, v in keys.iteritems()} | ||||
Gregory Szorc
|
r39595 | yield keys | ||
Gregory Szorc
|
r37564 | |||
@wireprotocommand('lookup', | ||||
args={ | ||||
'key': b'foo', | ||||
}, | ||||
permission='pull') | ||||
def lookupv2(repo, proto, key): | ||||
key = encoding.tolocal(key) | ||||
# TODO handle exception. | ||||
node = repo.lookup(key) | ||||
Gregory Szorc
|
r39595 | yield node | ||
Gregory Szorc
|
r37564 | |||
Gregory Szorc
|
r39673 | @wireprotocommand('manifestdata', | ||
args={ | ||||
'nodes': [b'0123456...'], | ||||
Gregory Szorc
|
r39677 | 'haveparents': True, | ||
Gregory Szorc
|
r39673 | 'fields': [b'parents', b'revision'], | ||
'tree': b'', | ||||
}, | ||||
permission='pull') | ||||
Gregory Szorc
|
r39677 | def manifestdata(repo, proto, haveparents=False, nodes=None, fields=None, | ||
tree=None): | ||||
Gregory Szorc
|
r39673 | fields = fields or set() | ||
if nodes is None: | ||||
raise error.WireprotoCommandError( | ||||
'nodes argument must be defined') | ||||
if tree is None: | ||||
raise error.WireprotoCommandError( | ||||
'tree argument must be defined') | ||||
store = repo.manifestlog.getstorage(tree) | ||||
# Validate the node is known and abort on unknown revisions. | ||||
for node in nodes: | ||||
try: | ||||
store.rev(node) | ||||
except error.LookupError: | ||||
raise error.WireprotoCommandError( | ||||
'unknown node: %s', (node,)) | ||||
Gregory Szorc
|
r39677 | revs, requests = builddeltarequests(store, nodes, haveparents) | ||
Gregory Szorc
|
r39673 | |||
yield { | ||||
b'totalitems': len(revs), | ||||
} | ||||
if b'revision' in fields: | ||||
deltas = store.emitrevisiondeltas(requests) | ||||
else: | ||||
deltas = None | ||||
for rev in revs: | ||||
node = store.node(rev) | ||||
if deltas is not None: | ||||
delta = next(deltas) | ||||
else: | ||||
delta = None | ||||
d = { | ||||
b'node': node, | ||||
} | ||||
if b'parents' in fields: | ||||
d[b'parents'] = store.parents(node) | ||||
if b'revision' in fields: | ||||
assert delta is not None | ||||
assert delta.flags == 0 | ||||
assert d[b'node'] == delta.node | ||||
if delta.revision is not None: | ||||
revisiondata = delta.revision | ||||
d[b'revisionsize'] = len(revisiondata) | ||||
else: | ||||
d[b'deltabasenode'] = delta.basenode | ||||
revisiondata = delta.delta | ||||
d[b'deltasize'] = len(revisiondata) | ||||
else: | ||||
revisiondata = None | ||||
yield d | ||||
if revisiondata is not None: | ||||
yield revisiondata | ||||
if deltas is not None: | ||||
try: | ||||
next(deltas) | ||||
raise error.ProgrammingError('should not have more deltas') | ||||
except GeneratorExit: | ||||
pass | ||||
Gregory Szorc
|
r37564 | @wireprotocommand('pushkey', | ||
args={ | ||||
'namespace': b'ns', | ||||
'key': b'key', | ||||
'old': b'old', | ||||
'new': b'new', | ||||
}, | ||||
permission='push') | ||||
def pushkeyv2(repo, proto, namespace, key, old, new): | ||||
# TODO handle ui output redirection | ||||
Gregory Szorc
|
r39595 | yield repo.pushkey(encoding.tolocal(namespace), | ||
encoding.tolocal(key), | ||||
encoding.tolocal(old), | ||||
encoding.tolocal(new)) | ||||