wireprotov1peer.py
648 lines
| 21.4 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotov1peer.py
Gregory Szorc
|
r37632 | # wireprotov1peer.py - Client-side functionality for wire protocol version 1. | ||
# | ||||
Raphaël Gomès
|
r47575 | # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com> | ||
Gregory Szorc
|
r37632 | # | ||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Gregory Szorc
|
r37648 | import sys | ||
Gregory Szorc
|
r37649 | import weakref | ||
Gregory Szorc
|
r37632 | |||
Augie Fackler
|
r49690 | from concurrent import futures | ||
Gregory Szorc
|
r37632 | from .i18n import _ | ||
Augie Fackler
|
r43346 | from .node import bin | ||
Gregory Szorc
|
r43359 | from .pycompat import ( | ||
getattr, | ||||
setattr, | ||||
) | ||||
Gregory Szorc
|
r37632 | from . import ( | ||
bundle2, | ||||
changegroup as changegroupmod, | ||||
encoding, | ||||
error, | ||||
pushkey as pushkeymod, | ||||
pycompat, | ||||
util, | ||||
wireprototypes, | ||||
) | ||||
Pulkit Goyal
|
r43078 | from .interfaces import ( | ||
repository, | ||||
Pulkit Goyal
|
r43079 | util as interfaceutil, | ||
Gregory Szorc
|
r37828 | ) | ||
Augie Fackler
|
r44517 | from .utils import hashutil | ||
Gregory Szorc
|
r37632 | |||
urlreq = util.urlreq | ||||
Augie Fackler
|
r43346 | |||
Valentin Gatien-Baron
|
r48687 | def batchable(f): | ||
Augie Fackler
|
r46554 | """annotation for batchable methods | ||
Gregory Szorc
|
r37633 | |||
Such methods must implement a coroutine as follows: | ||||
@batchable | ||||
def sample(self, one, two=None): | ||||
# Build list of encoded arguments suitable for your wire protocol: | ||||
Martin von Zweigbergk
|
r47209 | encoded_args = [('one', encode(one),), ('two', encode(two),)] | ||
Valentin Gatien-Baron
|
r48686 | # Return it, along with a function that will receive the result | ||
# from the batched request. | ||||
return encoded_args, decode | ||||
Gregory Szorc
|
r37633 | |||
The decorator returns a function which wraps this coroutine as a plain | ||||
method, but adds the original method as an attribute called "batchable", | ||||
which is used by remotebatch to split the call into separate encoding and | ||||
decoding phases. | ||||
Augie Fackler
|
r46554 | """ | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37633 | def plain(*args, **opts): | ||
Valentin Gatien-Baron
|
r48686 | encoded_args_or_res, decode = f(*args, **opts) | ||
if not decode: | ||||
Martin von Zweigbergk
|
r47209 | return encoded_args_or_res # a local result in this case | ||
Gregory Szorc
|
r37633 | self = args[0] | ||
cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr | ||||
Valentin Gatien-Baron
|
r48686 | encoded_res = self._submitone(cmd, encoded_args_or_res) | ||
return decode(encoded_res) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37633 | setattr(plain, 'batchable', f) | ||
Augie Fackler
|
r39624 | setattr(plain, '__name__', f.__name__) | ||
Gregory Szorc
|
r37633 | return plain | ||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37632 | def encodebatchcmds(req): | ||
"""Return a ``cmds`` argument value for the ``batch`` command.""" | ||||
escapearg = wireprototypes.escapebatcharg | ||||
cmds = [] | ||||
for op, argsdict in req: | ||||
# Old servers didn't properly unescape argument names. So prevent | ||||
# the sending of argument names that may not be decoded properly by | ||||
# servers. | ||||
assert all(escapearg(k) == k for k in argsdict) | ||||
Augie Fackler
|
r43347 | args = b','.join( | ||
Gregory Szorc
|
r49768 | b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items() | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | cmds.append(b'%s %s' % (op, args)) | ||
Gregory Szorc
|
r37632 | |||
Augie Fackler
|
r43347 | return b';'.join(cmds) | ||
Gregory Szorc
|
r37632 | |||
Augie Fackler
|
r43346 | |||
Augie Fackler
|
r49690 | class unsentfuture(futures.Future): | ||
Gregory Szorc
|
r37649 | """A Future variation to represent an unsent command. | ||
Because we buffer commands and don't submit them immediately, calling | ||||
``result()`` on an unsent future could deadlock. Futures for buffered | ||||
commands are represented by this type, which wraps ``result()`` to | ||||
call ``sendcommands()``. | ||||
""" | ||||
def result(self, timeout=None): | ||||
if self.done(): | ||||
Augie Fackler
|
r49690 | return futures.Future.result(self, timeout) | ||
Gregory Szorc
|
r37649 | |||
self._peerexecutor.sendcommands() | ||||
# This looks like it will infinitely recurse. However, | ||||
# sendcommands() should modify __class__. This call serves as a check | ||||
# on that. | ||||
return self.result(timeout) | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37828 | @interfaceutil.implementer(repository.ipeercommandexecutor) | ||
Gregory Szorc
|
r49801 | class peerexecutor: | ||
Gregory Szorc
|
r37648 | def __init__(self, peer): | ||
self._peer = peer | ||||
self._sent = False | ||||
self._closed = False | ||||
self._calls = [] | ||||
Gregory Szorc
|
r37649 | self._futures = weakref.WeakSet() | ||
self._responseexecutor = None | ||||
self._responsef = None | ||||
Gregory Szorc
|
r37648 | |||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, exctype, excvalee, exctb): | ||||
self.close() | ||||
def callcommand(self, command, args): | ||||
if self._sent: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'callcommand() cannot be used after commands are sent' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37648 | |||
if self._closed: | ||||
Augie Fackler
|
r43346 | raise error.ProgrammingError( | ||
Martin von Zweigbergk
|
r43387 | b'callcommand() cannot be used after close()' | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37648 | |||
# Commands are dispatched through methods on the peer. | ||||
fn = getattr(self._peer, pycompat.sysstr(command), None) | ||||
if not fn: | ||||
raise error.ProgrammingError( | ||||
Augie Fackler
|
r43347 | b'cannot call command %s: method of same name not available ' | ||
b'on peer' % command | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37648 | |||
# Commands are either batchable or they aren't. If a command | ||||
# isn't batchable, we send it immediately because the executor | ||||
# can no longer accept new commands after a non-batchable command. | ||||
Gregory Szorc
|
r37649 | # If a command is batchable, we queue it for later. But we have | ||
# to account for the case of a non-batchable command arriving after | ||||
# a batchable one and refuse to service it. | ||||
def addcall(): | ||||
Augie Fackler
|
r49690 | f = futures.Future() | ||
Gregory Szorc
|
r37649 | self._futures.add(f) | ||
self._calls.append((command, args, fn, f)) | ||||
return f | ||||
Gregory Szorc
|
r37648 | |||
if getattr(fn, 'batchable', False): | ||||
Gregory Szorc
|
r37649 | f = addcall() | ||
# But since we don't issue it immediately, we wrap its result() | ||||
# to trigger sending so we avoid deadlocks. | ||||
f.__class__ = unsentfuture | ||||
f._peerexecutor = self | ||||
Gregory Szorc
|
r37648 | else: | ||
if self._calls: | ||||
raise error.ProgrammingError( | ||||
Augie Fackler
|
r43347 | b'%s is not batchable and cannot be called on a command ' | ||
b'executor along with other commands' % command | ||||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37648 | |||
Gregory Szorc
|
r37649 | f = addcall() | ||
# Non-batchable commands can never coexist with another command | ||||
# in this executor. So send the command immediately. | ||||
self.sendcommands() | ||||
Gregory Szorc
|
r37648 | return f | ||
def sendcommands(self): | ||||
if self._sent: | ||||
return | ||||
if not self._calls: | ||||
return | ||||
self._sent = True | ||||
Gregory Szorc
|
r37649 | # Unhack any future types so caller seens a clean type and to break | ||
# cycle between us and futures. | ||||
for f in self._futures: | ||||
if isinstance(f, unsentfuture): | ||||
Augie Fackler
|
r49690 | f.__class__ = futures.Future | ||
Gregory Szorc
|
r37649 | f._peerexecutor = None | ||
Gregory Szorc
|
r37648 | calls = self._calls | ||
# Mainly to destroy references to futures. | ||||
self._calls = None | ||||
Gregory Szorc
|
r37649 | # Simple case of a single command. We call it synchronously. | ||
Gregory Szorc
|
r37648 | if len(calls) == 1: | ||
command, args, fn, f = calls[0] | ||||
# Future was cancelled. Ignore it. | ||||
if not f.set_running_or_notify_cancel(): | ||||
return | ||||
try: | ||||
result = fn(**pycompat.strkwargs(args)) | ||||
except Exception: | ||||
Augie Fackler
|
r37687 | pycompat.future_set_exception_info(f, sys.exc_info()[1:]) | ||
Gregory Szorc
|
r37648 | else: | ||
f.set_result(result) | ||||
return | ||||
Gregory Szorc
|
r37649 | # Batch commands are a bit harder. First, we have to deal with the | ||
# @batchable coroutine. That's a bit annoying. Furthermore, we also | ||||
# need to preserve streaming. i.e. it should be possible for the | ||||
# futures to resolve as data is coming in off the wire without having | ||||
# to wait for the final byte of the final response. We do this by | ||||
# spinning up a thread to read the responses. | ||||
requests = [] | ||||
states = [] | ||||
for command, args, fn, f in calls: | ||||
# Future was cancelled. Ignore it. | ||||
if not f.set_running_or_notify_cancel(): | ||||
continue | ||||
try: | ||||
Valentin Gatien-Baron
|
r48686 | encoded_args_or_res, decode = fn.batchable( | ||
Augie Fackler
|
r43346 | fn.__self__, **pycompat.strkwargs(args) | ||
) | ||||
Gregory Szorc
|
r37649 | except Exception: | ||
Augie Fackler
|
r37687 | pycompat.future_set_exception_info(f, sys.exc_info()[1:]) | ||
Gregory Szorc
|
r37649 | return | ||
Valentin Gatien-Baron
|
r48686 | if not decode: | ||
Martin von Zweigbergk
|
r47209 | f.set_result(encoded_args_or_res) | ||
Valentin Gatien-Baron
|
r41085 | else: | ||
Martin von Zweigbergk
|
r47209 | requests.append((command, encoded_args_or_res)) | ||
Valentin Gatien-Baron
|
r48686 | states.append((command, f, batchable, decode)) | ||
Gregory Szorc
|
r37649 | |||
if not requests: | ||||
return | ||||
# This will emit responses in order they were executed. | ||||
wireresults = self._peer._submitbatch(requests) | ||||
# The use of a thread pool executor here is a bit weird for something | ||||
# that only spins up a single thread. However, thread management is | ||||
# hard and it is easy to encounter race conditions, deadlocks, etc. | ||||
# concurrent.futures already solves these problems and its thread pool | ||||
# executor has minimal overhead. So we use it. | ||||
Augie Fackler
|
r49690 | self._responseexecutor = futures.ThreadPoolExecutor(1) | ||
Augie Fackler
|
r43346 | self._responsef = self._responseexecutor.submit( | ||
self._readbatchresponse, states, wireresults | ||||
) | ||||
Gregory Szorc
|
r37648 | |||
def close(self): | ||||
self.sendcommands() | ||||
Gregory Szorc
|
r37649 | if self._closed: | ||
return | ||||
Gregory Szorc
|
r37648 | self._closed = True | ||
Gregory Szorc
|
r37649 | if not self._responsef: | ||
return | ||||
# We need to wait on our in-flight response and then shut down the | ||||
# executor once we have a result. | ||||
try: | ||||
self._responsef.result() | ||||
finally: | ||||
self._responseexecutor.shutdown(wait=True) | ||||
self._responsef = None | ||||
self._responseexecutor = None | ||||
# If any of our futures are still in progress, mark them as | ||||
# errored. Otherwise a result() could wait indefinitely. | ||||
for f in self._futures: | ||||
if not f.done(): | ||||
Augie Fackler
|
r43346 | f.set_exception( | ||
error.ResponseError( | ||||
Valentin Gatien-Baron
|
r47430 | _(b'unfulfilled batch command response'), None | ||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Gregory Szorc
|
r37649 | |||
self._futures = None | ||||
def _readbatchresponse(self, states, wireresults): | ||||
# Executes in a thread to read data off the wire. | ||||
Valentin Gatien-Baron
|
r48686 | for command, f, batchable, decode in states: | ||
Gregory Szorc
|
r37649 | # Grab raw result off the wire and teach the internal future | ||
# about it. | ||||
Valentin Gatien-Baron
|
r47430 | try: | ||
remoteresult = next(wireresults) | ||||
except StopIteration: | ||||
# This can happen in particular because next(batchable) | ||||
# in the previous iteration can call peer._abort, which | ||||
# may close the peer. | ||||
f.set_exception( | ||||
error.ResponseError( | ||||
_(b'unfulfilled batch command response'), None | ||||
) | ||||
) | ||||
else: | ||||
try: | ||||
Valentin Gatien-Baron
|
r48686 | result = decode(remoteresult) | ||
Valentin Gatien-Baron
|
r47430 | except Exception: | ||
pycompat.future_set_exception_info(f, sys.exc_info()[1:]) | ||||
else: | ||||
f.set_result(result) | ||||
Gregory Szorc
|
r37649 | |||
Augie Fackler
|
r43346 | |||
@interfaceutil.implementer( | ||||
repository.ipeercommands, repository.ipeerlegacycommands | ||||
) | ||||
Gregory Szorc
|
r37653 | class wirepeer(repository.peer): | ||
Gregory Szorc
|
r37632 | """Client-side interface for communicating with a peer repository. | ||
Methods commonly call wire protocol commands of the same name. | ||||
See also httppeer.py and sshpeer.py for protocol-specific | ||||
implementations of this interface. | ||||
""" | ||||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r37648 | def commandexecutor(self): | ||
return peerexecutor(self) | ||||
Gregory Szorc
|
r37632 | # Begin of ipeercommands interface. | ||
Gregory Szorc
|
r37667 | def clonebundles(self): | ||
Augie Fackler
|
r43347 | self.requirecap(b'clonebundles', _(b'clone bundles')) | ||
return self._call(b'clonebundles') | ||||
Gregory Szorc
|
r37667 | |||
Gregory Szorc
|
r37632 | @batchable | ||
def lookup(self, key): | ||||
Augie Fackler
|
r43347 | self.requirecap(b'lookup', _(b'look up remote revision')) | ||
Valentin Gatien-Baron
|
r48687 | |||
def decode(d): | ||||
success, data = d[:-1].split(b" ", 1) | ||||
if int(success): | ||||
return bin(data) | ||||
else: | ||||
self._abort(error.RepoError(data)) | ||||
return {b'key': encoding.fromlocal(key)}, decode | ||||
Gregory Szorc
|
r37632 | |||
@batchable | ||||
def heads(self): | ||||
Valentin Gatien-Baron
|
r48687 | def decode(d): | ||
try: | ||||
return wireprototypes.decodelist(d[:-1]) | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_(b"unexpected response:"), d)) | ||||
return {}, decode | ||||
Gregory Szorc
|
r37632 | |||
@batchable | ||||
def known(self, nodes): | ||||
Valentin Gatien-Baron
|
r48687 | def decode(d): | ||
try: | ||||
return [bool(int(b)) for b in pycompat.iterbytestr(d)] | ||||
except ValueError: | ||||
self._abort(error.ResponseError(_(b"unexpected response:"), d)) | ||||
return {b'nodes': wireprototypes.encodelist(nodes)}, decode | ||||
Gregory Szorc
|
r37632 | |||
@batchable | ||||
def branchmap(self): | ||||
Valentin Gatien-Baron
|
r48687 | def decode(d): | ||
try: | ||||
branchmap = {} | ||||
for branchpart in d.splitlines(): | ||||
branchname, branchheads = branchpart.split(b' ', 1) | ||||
branchname = encoding.tolocal(urlreq.unquote(branchname)) | ||||
branchheads = wireprototypes.decodelist(branchheads) | ||||
branchmap[branchname] = branchheads | ||||
return branchmap | ||||
except TypeError: | ||||
self._abort(error.ResponseError(_(b"unexpected response:"), d)) | ||||
return {}, decode | ||||
Gregory Szorc
|
r37632 | |||
@batchable | ||||
def listkeys(self, namespace): | ||||
Augie Fackler
|
r43347 | if not self.capable(b'pushkey'): | ||
Valentin Gatien-Baron
|
r48687 | return {}, None | ||
Augie Fackler
|
r43347 | self.ui.debug(b'preparing listkeys for "%s"\n' % namespace) | ||
Valentin Gatien-Baron
|
r48687 | |||
def decode(d): | ||||
self.ui.debug( | ||||
b'received listkey for "%s": %i bytes\n' % (namespace, len(d)) | ||||
) | ||||
return pushkeymod.decodekeys(d) | ||||
return {b'namespace': encoding.fromlocal(namespace)}, decode | ||||
Gregory Szorc
|
r37632 | |||
@batchable | ||||
def pushkey(self, namespace, key, old, new): | ||||
Augie Fackler
|
r43347 | if not self.capable(b'pushkey'): | ||
Valentin Gatien-Baron
|
r48687 | return False, None | ||
Augie Fackler
|
r43347 | self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key)) | ||
Valentin Gatien-Baron
|
r48687 | |||
def decode(d): | ||||
d, output = d.split(b'\n', 1) | ||||
try: | ||||
d = bool(int(d)) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
_(b'push failed (unexpected response):'), d | ||||
) | ||||
for l in output.splitlines(True): | ||||
self.ui.status(_(b'remote: '), l) | ||||
return d | ||||
return { | ||||
Augie Fackler
|
r43347 | b'namespace': encoding.fromlocal(namespace), | ||
b'key': encoding.fromlocal(key), | ||||
b'old': encoding.fromlocal(old), | ||||
b'new': encoding.fromlocal(new), | ||||
Valentin Gatien-Baron
|
r48687 | }, decode | ||
Gregory Szorc
|
r37632 | |||
def stream_out(self): | ||||
Augie Fackler
|
r43347 | return self._callstream(b'stream_out') | ||
Gregory Szorc
|
r37632 | |||
def getbundle(self, source, **kwargs): | ||||
kwargs = pycompat.byteskwargs(kwargs) | ||||
Augie Fackler
|
r43347 | self.requirecap(b'getbundle', _(b'look up remote changes')) | ||
Gregory Szorc
|
r37632 | opts = {} | ||
Augie Fackler
|
r43347 | bundlecaps = kwargs.get(b'bundlecaps') or set() | ||
Gregory Szorc
|
r49768 | for key, value in kwargs.items(): | ||
Gregory Szorc
|
r37632 | if value is None: | ||
continue | ||||
keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key) | ||||
if keytype is None: | ||||
raise error.ProgrammingError( | ||||
Augie Fackler
|
r43347 | b'Unexpectedly None keytype for key %s' % key | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | elif keytype == b'nodes': | ||
Gregory Szorc
|
r37632 | value = wireprototypes.encodelist(value) | ||
Augie Fackler
|
r43347 | elif keytype == b'csv': | ||
value = b','.join(value) | ||||
elif keytype == b'scsv': | ||||
value = b','.join(sorted(value)) | ||||
elif keytype == b'boolean': | ||||
value = b'%i' % bool(value) | ||||
elif keytype != b'plain': | ||||
raise KeyError(b'unknown getbundle option type %s' % keytype) | ||||
Gregory Szorc
|
r37632 | opts[key] = value | ||
Augie Fackler
|
r43347 | f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts)) | ||
if any((cap.startswith(b'HG2') for cap in bundlecaps)): | ||||
Gregory Szorc
|
r37632 | return bundle2.getunbundler(self.ui, f) | ||
else: | ||||
Augie Fackler
|
r43347 | return changegroupmod.cg1unpacker(f, b'UN') | ||
Gregory Szorc
|
r37632 | |||
Gregory Szorc
|
r37664 | def unbundle(self, bundle, heads, url): | ||
Augie Fackler
|
r46554 | """Send cg (a readable file-like object representing the | ||
Gregory Szorc
|
r37632 | changegroup to push, typically a chunkbuffer object) to the | ||
remote server as a bundle. | ||||
When pushing a bundle10 stream, return an integer indicating the | ||||
result of the push (see changegroup.apply()). | ||||
When pushing a bundle20 stream, return a bundle20 stream. | ||||
`url` is the url the client thinks it's pushing to, which is | ||||
visible to hooks. | ||||
Augie Fackler
|
r46554 | """ | ||
Gregory Szorc
|
r37632 | |||
Augie Fackler
|
r43347 | if heads != [b'force'] and self.capable(b'unbundlehash'): | ||
Gregory Szorc
|
r37632 | heads = wireprototypes.encodelist( | ||
Augie Fackler
|
r44517 | [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()] | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37632 | else: | ||
heads = wireprototypes.encodelist(heads) | ||||
Augie Fackler
|
r43347 | if util.safehasattr(bundle, b'deltaheader'): | ||
Gregory Szorc
|
r37632 | # this a bundle10, do the old style call sequence | ||
Augie Fackler
|
r43347 | ret, output = self._callpush(b"unbundle", bundle, heads=heads) | ||
if ret == b"": | ||||
raise error.ResponseError(_(b'push failed:'), output) | ||||
Gregory Szorc
|
r37632 | try: | ||
ret = int(ret) | ||||
except ValueError: | ||||
raise error.ResponseError( | ||||
Augie Fackler
|
r43347 | _(b'push failed (unexpected response):'), ret | ||
Augie Fackler
|
r43346 | ) | ||
Gregory Szorc
|
r37632 | |||
for l in output.splitlines(True): | ||||
Augie Fackler
|
r43347 | self.ui.status(_(b'remote: '), l) | ||
Gregory Szorc
|
r37632 | else: | ||
# bundle2 push. Send a stream, fetch a stream. | ||||
Augie Fackler
|
r43347 | stream = self._calltwowaystream(b'unbundle', bundle, heads=heads) | ||
Gregory Szorc
|
r37632 | ret = bundle2.getunbundler(self.ui, stream) | ||
return ret | ||||
# End of ipeercommands interface. | ||||
# Begin of ipeerlegacycommands interface. | ||||
def branches(self, nodes): | ||||
n = wireprototypes.encodelist(nodes) | ||||
Augie Fackler
|
r43347 | d = self._call(b"branches", nodes=n) | ||
Gregory Szorc
|
r37632 | try: | ||
br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()] | ||||
return br | ||||
except ValueError: | ||||
Augie Fackler
|
r43347 | self._abort(error.ResponseError(_(b"unexpected response:"), d)) | ||
Gregory Szorc
|
r37632 | |||
def between(self, pairs): | ||||
Augie Fackler
|
r43346 | batch = 8 # avoid giant requests | ||
Gregory Szorc
|
r37632 | r = [] | ||
Manuel Jacob
|
r50179 | for i in range(0, len(pairs), batch): | ||
Augie Fackler
|
r43347 | n = b" ".join( | ||
Augie Fackler
|
r43346 | [ | ||
Augie Fackler
|
r43347 | wireprototypes.encodelist(p, b'-') | ||
Augie Fackler
|
r43346 | for p in pairs[i : i + batch] | ||
] | ||||
) | ||||
Augie Fackler
|
r43347 | d = self._call(b"between", pairs=n) | ||
Gregory Szorc
|
r37632 | try: | ||
Augie Fackler
|
r43346 | r.extend( | ||
l and wireprototypes.decodelist(l) or [] | ||||
for l in d.splitlines() | ||||
) | ||||
Gregory Szorc
|
r37632 | except ValueError: | ||
Augie Fackler
|
r43347 | self._abort(error.ResponseError(_(b"unexpected response:"), d)) | ||
Gregory Szorc
|
r37632 | return r | ||
Gregory Szorc
|
r37653 | def changegroup(self, nodes, source): | ||
Gregory Szorc
|
r37632 | n = wireprototypes.encodelist(nodes) | ||
Augie Fackler
|
r43347 | f = self._callcompressable(b"changegroup", roots=n) | ||
return changegroupmod.cg1unpacker(f, b'UN') | ||||
Gregory Szorc
|
r37632 | |||
Gregory Szorc
|
r37653 | def changegroupsubset(self, bases, heads, source): | ||
Augie Fackler
|
r43347 | self.requirecap(b'changegroupsubset', _(b'look up remote changes')) | ||
Gregory Szorc
|
r37632 | bases = wireprototypes.encodelist(bases) | ||
heads = wireprototypes.encodelist(heads) | ||||
Augie Fackler
|
r43346 | f = self._callcompressable( | ||
Augie Fackler
|
r43347 | b"changegroupsubset", bases=bases, heads=heads | ||
Augie Fackler
|
r43346 | ) | ||
Augie Fackler
|
r43347 | return changegroupmod.cg1unpacker(f, b'UN') | ||
Gregory Szorc
|
r37632 | |||
# End of ipeerlegacycommands interface. | ||||
def _submitbatch(self, req): | ||||
"""run batch request <req> on the server | ||||
Returns an iterator of the raw responses from the server. | ||||
""" | ||||
ui = self.ui | ||||
Augie Fackler
|
r43347 | if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'): | ||
ui.debug(b'devel-peer-request: batched-content\n') | ||||
Gregory Szorc
|
r37632 | for op, args in req: | ||
Augie Fackler
|
r43347 | msg = b'devel-peer-request: - %s (%d arguments)\n' | ||
Gregory Szorc
|
r37632 | ui.debug(msg % (op, len(args))) | ||
unescapearg = wireprototypes.unescapebatcharg | ||||
Augie Fackler
|
r43347 | rsp = self._callstream(b"batch", cmds=encodebatchcmds(req)) | ||
Gregory Szorc
|
r37632 | chunk = rsp.read(1024) | ||
work = [chunk] | ||||
while chunk: | ||||
Augie Fackler
|
r43347 | while b';' not in chunk and chunk: | ||
Gregory Szorc
|
r37632 | chunk = rsp.read(1024) | ||
work.append(chunk) | ||||
Augie Fackler
|
r43347 | merged = b''.join(work) | ||
while b';' in merged: | ||||
one, merged = merged.split(b';', 1) | ||||
Gregory Szorc
|
r37632 | yield unescapearg(one) | ||
chunk = rsp.read(1024) | ||||
work = [merged, chunk] | ||||
Augie Fackler
|
r43347 | yield unescapearg(b''.join(work)) | ||
Gregory Szorc
|
r37632 | |||
def _submitone(self, op, args): | ||||
return self._call(op, **pycompat.strkwargs(args)) | ||||
def debugwireargs(self, one, two, three=None, four=None, five=None): | ||||
# don't pass optional arguments left at their default value | ||||
opts = {} | ||||
if three is not None: | ||||
Augie Fackler
|
r43906 | opts['three'] = three | ||
Gregory Szorc
|
r37632 | if four is not None: | ||
Augie Fackler
|
r43906 | opts['four'] = four | ||
Augie Fackler
|
r43347 | return self._call(b'debugwireargs', one=one, two=two, **opts) | ||
Gregory Szorc
|
r37632 | |||
def _call(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a simple string. | ||||
returns the server reply as a string.""" | ||||
raise NotImplementedError() | ||||
def _callstream(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a stream. Note that if the | ||||
command doesn't return a stream, _callstream behaves | ||||
differently for ssh and http peers. | ||||
returns the server reply as a file like object. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _callcompressable(self, cmd, **args): | ||||
"""execute <cmd> on the server | ||||
The command is expected to return a stream. | ||||
The stream may have been compressed in some implementations. This | ||||
function takes care of the decompression. This is the only difference | ||||
with _callstream. | ||||
returns the server reply as a file like object. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _callpush(self, cmd, fp, **args): | ||||
"""execute a <cmd> on server | ||||
The command is expected to be related to a push. Push has a special | ||||
return method. | ||||
returns the server reply as a (ret, output) tuple. ret is either | ||||
empty (error) or a stringified int. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _calltwowaystream(self, cmd, fp, **args): | ||||
"""execute <cmd> on server | ||||
The command will send a stream to the server and get a stream in reply. | ||||
""" | ||||
raise NotImplementedError() | ||||
def _abort(self, exception): | ||||
Augie Fackler
|
r46554 | """clearly abort the wire protocol connection and raise the exception""" | ||
Gregory Szorc
|
r37632 | raise NotImplementedError() | ||