wireprotov1peer.py
615 lines
| 20.9 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotov1peer.py
Gregory Szorc
|
r37632 | # wireprotov1peer.py - Client-side functionality for wire protocol version 1. | ||
# | ||||
# Copyright 2005-2010 Matt Mackall <mpm@selenic.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import hashlib | ||||
Gregory Szorc
|
r37648 | import sys | ||
Gregory Szorc
|
r37649 | import weakref | ||
Gregory Szorc
|
r37632 | |||
from .i18n import _ | ||||
from .node import ( | ||||
bin, | ||||
) | ||||
Gregory Szorc
|
r37648 | from .thirdparty.zope import ( | ||
interface as zi, | ||||
) | ||||
Gregory Szorc
|
r37632 | from . import ( | ||
bundle2, | ||||
changegroup as changegroupmod, | ||||
encoding, | ||||
error, | ||||
pushkey as pushkeymod, | ||||
pycompat, | ||||
repository, | ||||
util, | ||||
wireprototypes, | ||||
) | ||||
urlreq = util.urlreq | ||||
Gregory Szorc
|
r37633 | def batchable(f): | ||
'''annotation for batchable methods | ||||
Such methods must implement a coroutine as follows: | ||||
@batchable | ||||
def sample(self, one, two=None): | ||||
# Build list of encoded arguments suitable for your wire protocol: | ||||
encargs = [('one', encode(one),), ('two', encode(two),)] | ||||
# Create future for injection of encoded result: | ||||
encresref = future() | ||||
# Return encoded arguments and future: | ||||
yield encargs, encresref | ||||
# Assuming the future to be filled with the result from the batched | ||||
# request now. Decode it: | ||||
yield decode(encresref.value) | ||||
The decorator returns a function which wraps this coroutine as a plain | ||||
method, but adds the original method as an attribute called "batchable", | ||||
which is used by remotebatch to split the call into separate encoding and | ||||
decoding phases. | ||||
''' | ||||
def plain(*args, **opts): | ||||
batchable = f(*args, **opts) | ||||
encargsorres, encresref = next(batchable) | ||||
if not encresref: | ||||
return encargsorres # a local result in this case | ||||
self = args[0] | ||||
cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr | ||||
encresref.set(self._submitone(cmd, encargsorres)) | ||||
return next(batchable) | ||||
setattr(plain, 'batchable', f) | ||||
return plain | ||||
class future(object): | ||||
'''placeholder for a value to be set later''' | ||||
def set(self, value): | ||||
if util.safehasattr(self, 'value'): | ||||
raise error.RepoError("future is already set") | ||||
self.value = value | ||||
Gregory Szorc
|
r37632 | def encodebatchcmds(req): | ||
"""Return a ``cmds`` argument value for the ``batch`` command.""" | ||||
escapearg = wireprototypes.escapebatcharg | ||||
cmds = [] | ||||
for op, argsdict in req: | ||||
# Old servers didn't properly unescape argument names. So prevent | ||||
# the sending of argument names that may not be decoded properly by | ||||
# servers. | ||||
assert all(escapearg(k) == k for k in argsdict) | ||||
args = ','.join('%s=%s' % (escapearg(k), escapearg(v)) | ||||
for k, v in argsdict.iteritems()) | ||||
cmds.append('%s %s' % (op, args)) | ||||
return ';'.join(cmds) | ||||
Gregory Szorc
|
r37649 | class unsentfuture(pycompat.futures.Future): | ||
"""A Future variation to represent an unsent command. | ||||
Because we buffer commands and don't submit them immediately, calling | ||||
``result()`` on an unsent future could deadlock. Futures for buffered | ||||
commands are represented by this type, which wraps ``result()`` to | ||||
call ``sendcommands()``. | ||||
""" | ||||
def result(self, timeout=None): | ||||
if self.done(): | ||||
return pycompat.futures.Future.result(self, timeout) | ||||
self._peerexecutor.sendcommands() | ||||
# This looks like it will infinitely recurse. However, | ||||
# sendcommands() should modify __class__. This call serves as a check | ||||
# on that. | ||||
return self.result(timeout) | ||||
Gregory Szorc
|
r37648 | @zi.implementer(repository.ipeercommandexecutor) | ||
class peerexecutor(object): | ||||
def __init__(self, peer): | ||||
self._peer = peer | ||||
self._sent = False | ||||
self._closed = False | ||||
self._calls = [] | ||||
Gregory Szorc
|
r37649 | self._futures = weakref.WeakSet() | ||
self._responseexecutor = None | ||||
self._responsef = None | ||||
Gregory Szorc
|
r37648 | |||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, exctype, excvalee, exctb): | ||||
self.close() | ||||
def callcommand(self, command, args): | ||||
if self._sent: | ||||
raise error.ProgrammingError('callcommand() cannot be used ' | ||||
'after commands are sent') | ||||
if self._closed: | ||||
raise error.ProgrammingError('callcommand() cannot be used ' | ||||
'after close()') | ||||
# Commands are dispatched through methods on the peer. | ||||
fn = getattr(self._peer, pycompat.sysstr(command), None) | ||||
if not fn: | ||||
raise error.ProgrammingError( | ||||
'cannot call command %s: method of same name not available ' | ||||
'on peer' % command) | ||||
# Commands are either batchable or they aren't. If a command | ||||
# isn't batchable, we send it immediately because the executor | ||||
# can no longer accept new commands after a non-batchable command. | ||||
Gregory Szorc
|
r37649 | # If a command is batchable, we queue it for later. But we have | ||
# to account for the case of a non-batchable command arriving after | ||||
# a batchable one and refuse to service it. | ||||
def addcall(): | ||||
f = pycompat.futures.Future() | ||||
self._futures.add(f) | ||||
self._calls.append((command, args, fn, f)) | ||||
return f | ||||
Gregory Szorc
|
r37648 | |||
if getattr(fn, 'batchable', False): | ||||
Gregory Szorc
|
r37649 | f = addcall() | ||
# But since we don't issue it immediately, we wrap its result() | ||||
# to trigger sending so we avoid deadlocks. | ||||
f.__class__ = unsentfuture | ||||
f._peerexecutor = self | ||||
Gregory Szorc
|
r37648 | else: | ||
if self._calls: | ||||
raise error.ProgrammingError( | ||||
'%s is not batchable and cannot be called on a command ' | ||||
'executor along with other commands' % command) | ||||
Gregory Szorc
|
r37649 | f = addcall() | ||
# Non-batchable commands can never coexist with another command | ||||
# in this executor. So send the command immediately. | ||||
self.sendcommands() | ||||
Gregory Szorc
|
r37648 | return f | ||
def sendcommands(self): | ||||
if self._sent: | ||||
return | ||||
if not self._calls: | ||||
return | ||||
self._sent = True | ||||
Gregory Szorc
|
r37649 | # Unhack any future types so caller seens a clean type and to break | ||
# cycle between us and futures. | ||||
for f in self._futures: | ||||
if isinstance(f, unsentfuture): | ||||
f.__class__ = pycompat.futures.Future | ||||
f._peerexecutor = None | ||||
Gregory Szorc
|
r37648 | calls = self._calls | ||
# Mainly to destroy references to futures. | ||||
self._calls = None | ||||
Gregory Szorc
|
r37649 | # Simple case of a single command. We call it synchronously. | ||
Gregory Szorc
|
r37648 | if len(calls) == 1: | ||
command, args, fn, f = calls[0] | ||||
# Future was cancelled. Ignore it. | ||||
if not f.set_running_or_notify_cancel(): | ||||
return | ||||
try: | ||||
result = fn(**pycompat.strkwargs(args)) | ||||
except Exception: | ||||
f.set_exception_info(*sys.exc_info()[1:]) | ||||
else: | ||||
f.set_result(result) | ||||
return | ||||
Gregory Szorc
|
r37649 | # Batch commands are a bit harder. First, we have to deal with the | ||
# @batchable coroutine. That's a bit annoying. Furthermore, we also | ||||
# need to preserve streaming. i.e. it should be possible for the | ||||
# futures to resolve as data is coming in off the wire without having | ||||
# to wait for the final byte of the final response. We do this by | ||||
# spinning up a thread to read the responses. | ||||
requests = [] | ||||
states = [] | ||||
for command, args, fn, f in calls: | ||||
# Future was cancelled. Ignore it. | ||||
if not f.set_running_or_notify_cancel(): | ||||
continue | ||||
try: | ||||
batchable = fn.batchable(fn.__self__, | ||||
**pycompat.strkwargs(args)) | ||||
except Exception: | ||||
f.set_exception_info(*sys.exc_info()[1:]) | ||||
return | ||||
# Encoded arguments and future holding remote result. | ||||
try: | ||||
encodedargs, fremote = next(batchable) | ||||
except Exception: | ||||
f.set_exception_info(*sys.exc_info()[1:]) | ||||
return | ||||
requests.append((command, encodedargs)) | ||||
states.append((command, f, batchable, fremote)) | ||||
if not requests: | ||||
return | ||||
# This will emit responses in order they were executed. | ||||
wireresults = self._peer._submitbatch(requests) | ||||
# The use of a thread pool executor here is a bit weird for something | ||||
# that only spins up a single thread. However, thread management is | ||||
# hard and it is easy to encounter race conditions, deadlocks, etc. | ||||
# concurrent.futures already solves these problems and its thread pool | ||||
# executor has minimal overhead. So we use it. | ||||
self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | ||||
self._responsef = self._responseexecutor.submit(self._readbatchresponse, | ||||
states, wireresults) | ||||
Gregory Szorc
|
r37648 | |||
def close(self): | ||||
self.sendcommands() | ||||
Gregory Szorc
|
r37649 | if self._closed: | ||
return | ||||
Gregory Szorc
|
r37648 | self._closed = True | ||
Gregory Szorc
|
r37649 | if not self._responsef: | ||
return | ||||
# We need to wait on our in-flight response and then shut down the | ||||
# executor once we have a result. | ||||
try: | ||||
self._responsef.result() | ||||
finally: | ||||
self._responseexecutor.shutdown(wait=True) | ||||
self._responsef = None | ||||
self._responseexecutor = None | ||||
# If any of our futures are still in progress, mark them as | ||||
# errored. Otherwise a result() could wait indefinitely. | ||||
for f in self._futures: | ||||
if not f.done(): | ||||
f.set_exception(error.ResponseError( | ||||
_('unfulfilled batch command response'))) | ||||
self._futures = None | ||||
def _readbatchresponse(self, states, wireresults): | ||||
# Executes in a thread to read data off the wire. | ||||
for command, f, batchable, fremote in states: | ||||
# Grab raw result off the wire and teach the internal future | ||||
# about it. | ||||
remoteresult = next(wireresults) | ||||
fremote.set(remoteresult) | ||||
# And ask the coroutine to decode that value. | ||||
try: | ||||
result = next(batchable) | ||||
except Exception: | ||||
f.set_exception_info(*sys.exc_info()[1:]) | ||||
else: | ||||
f.set_result(result) | ||||
Gregory Szorc
|
r37653 | @zi.implementer(repository.ipeerlegacycommands) | ||
class wirepeer(repository.peer): | ||||
Gregory Szorc
|
r37632 | """Client-side interface for communicating with a peer repository. | ||
Methods commonly call wire protocol commands of the same name. | ||||
See also httppeer.py and sshpeer.py for protocol-specific | ||||
implementations of this interface. | ||||
""" | ||||
Gregory Szorc
|
r37648 | def commandexecutor(self): | ||
return peerexecutor(self) | ||||
Gregory Szorc
|
r37632 | # Begin of ipeercommands interface. | ||
@batchable | ||||
def lookup(self, key): | ||||
self.requirecap('lookup', _('look up remote revision')) | ||||
f = future() | ||||
yield {'key': encoding.fromlocal(key)}, f | ||||
d = f.value | ||||
success, data = d[:-1].split(" ", 1) | ||||
if int(success): | ||||
yield bin(data) | ||||
else: | ||||
self._abort(error.RepoError(data)) | ||||
@batchable | ||||
def heads(self): | ||||
f = future() | ||||
yield {}, f | ||||
d = f.value | ||||
try: | ||||
yield wireprototypes.decodelist(d[:-1]) | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_("unexpected response:"), d)) | ||||
@batchable | ||||
def known(self, nodes): | ||||
f = future() | ||||
yield {'nodes': wireprototypes.encodelist(nodes)}, f | ||||
d = f.value | ||||
try: | ||||
yield [bool(int(b)) for b in d] | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_("unexpected response:"), d)) | ||||
@batchable | ||||
def branchmap(self): | ||||
f = future() | ||||
yield {}, f | ||||
d = f.value | ||||
try: | ||||
branchmap = {} | ||||
for branchpart in d.splitlines(): | ||||
branchname, branchheads = branchpart.split(' ', 1) | ||||
branchname = encoding.tolocal(urlreq.unquote(branchname)) | ||||
branchheads = wireprototypes.decodelist(branchheads) | ||||
branchmap[branchname] = branchheads | ||||
yield branchmap | ||||
except TypeError: | ||||
self._abort(error.ResponseError(_("unexpected response:"), d)) | ||||
@batchable | ||||
def listkeys(self, namespace): | ||||
if not self.capable('pushkey'): | ||||
yield {}, None | ||||
f = future() | ||||
self.ui.debug('preparing listkeys for "%s"\n' % namespace) | ||||
yield {'namespace': encoding.fromlocal(namespace)}, f | ||||
d = f.value | ||||
self.ui.debug('received listkey for "%s": %i bytes\n' | ||||
% (namespace, len(d))) | ||||
yield pushkeymod.decodekeys(d) | ||||
@batchable | ||||
def pushkey(self, namespace, key, old, new): | ||||
if not self.capable('pushkey'): | ||||
yield False, None | ||||
f = future() | ||||
self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key)) | ||||
yield {'namespace': encoding.fromlocal(namespace), | ||||
'key': encoding.fromlocal(key), | ||||
'old': encoding.fromlocal(old), | ||||
'new': encoding.fromlocal(new)}, f | ||||
d = f.value | ||||
d, output = d.split('\n', 1) | ||||
try: | ||||
d = bool(int(d)) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
_('push failed (unexpected response):'), d) | ||||
for l in output.splitlines(True): | ||||
self.ui.status(_('remote: '), l) | ||||
yield d | ||||
def stream_out(self): | ||||
return self._callstream('stream_out') | ||||
def getbundle(self, source, **kwargs): | ||||
kwargs = pycompat.byteskwargs(kwargs) | ||||
self.requirecap('getbundle', _('look up remote changes')) | ||||
opts = {} | ||||
bundlecaps = kwargs.get('bundlecaps') or set() | ||||
for key, value in kwargs.iteritems(): | ||||
if value is None: | ||||
continue | ||||
keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) | ||||
if keytype is None: | ||||
raise error.ProgrammingError( | ||||
'Unexpectedly None keytype for key %s' % key) | ||||
elif keytype == 'nodes': | ||||
value = wireprototypes.encodelist(value) | ||||
elif keytype == 'csv': | ||||
value = ','.join(value) | ||||
elif keytype == 'scsv': | ||||
value = ','.join(sorted(value)) | ||||
elif keytype == 'boolean': | ||||
value = '%i' % bool(value) | ||||
elif keytype != 'plain': | ||||
raise KeyError('unknown getbundle option type %s' | ||||
% keytype) | ||||
opts[key] = value | ||||
f = self._callcompressable("getbundle", **pycompat.strkwargs(opts)) | ||||
if any((cap.startswith('HG2') for cap in bundlecaps)): | ||||
return bundle2.getunbundler(self.ui, f) | ||||
else: | ||||
return changegroupmod.cg1unpacker(f, 'UN') | ||||
def unbundle(self, cg, heads, url): | ||||
'''Send cg (a readable file-like object representing the | ||||
changegroup to push, typically a chunkbuffer object) to the | ||||
remote server as a bundle. | ||||
When pushing a bundle10 stream, return an integer indicating the | ||||
result of the push (see changegroup.apply()). | ||||
When pushing a bundle20 stream, return a bundle20 stream. | ||||
`url` is the url the client thinks it's pushing to, which is | ||||
visible to hooks. | ||||
''' | ||||
if heads != ['force'] and self.capable('unbundlehash'): | ||||
heads = wireprototypes.encodelist( | ||||
['hashed', hashlib.sha1(''.join(sorted(heads))).digest()]) | ||||
else: | ||||
heads = wireprototypes.encodelist(heads) | ||||
if util.safehasattr(cg, 'deltaheader'): | ||||
# this a bundle10, do the old style call sequence | ||||
ret, output = self._callpush("unbundle", cg, heads=heads) | ||||
if ret == "": | ||||
raise error.ResponseError( | ||||
_('push failed:'), output) | ||||
try: | ||||
ret = int(ret) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
_('push failed (unexpected response):'), ret) | ||||
for l in output.splitlines(True): | ||||
self.ui.status(_('remote: '), l) | ||||
else: | ||||
# bundle2 push. Send a stream, fetch a stream. | ||||
stream = self._calltwowaystream('unbundle', cg, heads=heads) | ||||
ret = bundle2.getunbundler(self.ui, stream) | ||||
return ret | ||||
# End of ipeercommands interface. | ||||
# Begin of ipeerlegacycommands interface. | ||||
def branches(self, nodes): | ||||
n = wireprototypes.encodelist(nodes) | ||||
d = self._call("branches", nodes=n) | ||||
try: | ||||
br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] | ||||
return br | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_("unexpected response:"), d)) | ||||
def between(self, pairs): | ||||
batch = 8 # avoid giant requests | ||||
r = [] | ||||
for i in xrange(0, len(pairs), batch): | ||||
n = " ".join([wireprototypes.encodelist(p, '-') | ||||
for p in pairs[i:i + batch]]) | ||||
d = self._call("between", pairs=n) | ||||
try: | ||||
r.extend(l and wireprototypes.decodelist(l) or [] | ||||
for l in d.splitlines()) | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_("unexpected response:"), d)) | ||||
return r | ||||
Gregory Szorc
|
r37653 | def changegroup(self, nodes, source): | ||
Gregory Szorc
|
r37632 | n = wireprototypes.encodelist(nodes) | ||
f = self._callcompressable("changegroup", roots=n) | ||||
return changegroupmod.cg1unpacker(f, 'UN') | ||||
Gregory Szorc
|
r37653 | def changegroupsubset(self, bases, heads, source): | ||
Gregory Szorc
|
r37632 | self.requirecap('changegroupsubset', _('look up remote changes')) | ||
bases = wireprototypes.encodelist(bases) | ||||
heads = wireprototypes.encodelist(heads) | ||||
f = self._callcompressable("changegroupsubset", | ||||
bases=bases, heads=heads) | ||||
return changegroupmod.cg1unpacker(f, 'UN') | ||||
# End of ipeerlegacycommands interface. | ||||
def _submitbatch(self, req): | ||||
"""run batch request <req> on the server | ||||
Returns an iterator of the raw responses from the server. | ||||
""" | ||||
ui = self.ui | ||||
if ui.debugflag and ui.configbool('devel', 'debug.peer-request'): | ||||
ui.debug('devel-peer-request: batched-content\n') | ||||
for op, args in req: | ||||
msg = 'devel-peer-request: - %s (%d arguments)\n' | ||||
ui.debug(msg % (op, len(args))) | ||||
unescapearg = wireprototypes.unescapebatcharg | ||||
rsp = self._callstream("batch", cmds=encodebatchcmds(req)) | ||||
chunk = rsp.read(1024) | ||||
work = [chunk] | ||||
while chunk: | ||||
while ';' not in chunk and chunk: | ||||
chunk = rsp.read(1024) | ||||
work.append(chunk) | ||||
merged = ''.join(work) | ||||
while ';' in merged: | ||||
one, merged = merged.split(';', 1) | ||||
yield unescapearg(one) | ||||
chunk = rsp.read(1024) | ||||
work = [merged, chunk] | ||||
yield unescapearg(''.join(work)) | ||||
def _submitone(self, op, args): | ||||
return self._call(op, **pycompat.strkwargs(args)) | ||||
def debugwireargs(self, one, two, three=None, four=None, five=None): | ||||
# don't pass optional arguments left at their default value | ||||
opts = {} | ||||
if three is not None: | ||||
opts[r'three'] = three | ||||
if four is not None: | ||||
opts[r'four'] = four | ||||
return self._call('debugwireargs', one=one, two=two, **opts) | ||||
def _call(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a simple string. | ||||
returns the server reply as a string.""" | ||||
raise NotImplementedError() | ||||
def _callstream(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a stream. Note that if the | ||||
command doesn't return a stream, _callstream behaves | ||||
differently for ssh and http peers. | ||||
returns the server reply as a file like object. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _callcompressable(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a stream. | ||||
The stream may have been compressed in some implementations. This | ||||
function takes care of the decompression. This is the only difference | ||||
with _callstream. | ||||
returns the server reply as a file like object. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _callpush(self, cmd, fp, **args): | ||||
"""execute a <cmd> on server | ||||
The command is expected to be related to a push. Push has a special | ||||
return method. | ||||
returns the server reply as a (ret, output) tuple. ret is either | ||||
empty (error) or a stringified int. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _calltwowaystream(self, cmd, fp, **args): | ||||
"""execute <cmd> on server | ||||
The command will send a stream to the server and get a stream in reply. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _abort(self, exception): | ||||
"""clearly abort the wire protocol connection and raise the exception | ||||
""" | ||||
raise NotImplementedError() | ||||