##// END OF EJS Templates
hgweb: stop using the `pycompat.open()` shim
hgweb: stop using the `pycompat.open()` shim

File last commit:

r53023:8b791764 default
r53263:c9baa354 default
Show More
wireprotov1peer.py
666 lines | 22.1 KiB | text/x-python | PythonLexer
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
#
Raphaël Gomès
contributor: change mentions of mpm to olivia...
r47575 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 #
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Matt Harbison
typing: add `from __future__ import annotations` to most files...
r52756 from __future__ import annotations
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 import sys
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 import weakref
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 from concurrent import futures
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 from .i18n import _
Augie Fackler
formatting: blacken the codebase...
r43346 from .node import bin
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 from . import (
bundle2,
changegroup as changegroupmod,
encoding,
error,
pushkey as pushkeymod,
pycompat,
util,
wireprototypes,
)
Pulkit Goyal
interfaces: create a new folder for interfaces and move repository.py in it...
r43078 from .interfaces import (
repository,
Pulkit Goyal
interfaceutil: move to interfaces/...
r43079 util as interfaceutil,
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 )
Augie Fackler
core: migrate uses of hashlib.sha1 to hashutil.sha1...
r44517 from .utils import hashutil
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
urlreq = util.urlreq
Augie Fackler
formatting: blacken the codebase...
r43346
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def batchable(f):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """annotation for batchable methods
Gregory Szorc
peer: scatter module to the wind (API)...
r37633
Such methods must implement a coroutine as follows:
@batchable
def sample(self, one, two=None):
# Build list of encoded arguments suitable for your wire protocol:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 encoded_args = [('one', encode(one),), ('two', encode(two),)]
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 # Return it, along with a function that will receive the result
# from the batched request.
return encoded_args, decode
Gregory Szorc
peer: scatter module to the wind (API)...
r37633
The decorator returns a function which wraps this coroutine as a plain
method, but adds the original method as an attribute called "batchable",
which is used by remotebatch to split the call into separate encoding and
decoding phases.
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 def plain(*args, **opts):
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_args_or_res, decode = f(*args, **opts)
if not decode:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 return encoded_args_or_res # a local result in this case
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 self = args[0]
cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_res = self._submitone(cmd, encoded_args_or_res)
return decode(encoded_res)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 setattr(plain, 'batchable', f)
Augie Fackler
wireprotov1peer: forward __name__ of wrapped method in batchable decorator...
r39624 setattr(plain, '__name__', f.__name__)
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 return plain
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 def encodebatchcmds(req):
"""Return a ``cmds`` argument value for the ``batch`` command."""
escapearg = wireprototypes.escapebatcharg
cmds = []
for op, argsdict in req:
# Old servers didn't properly unescape argument names. So prevent
# the sending of argument names that may not be decoded properly by
# servers.
assert all(escapearg(k) == k for k in argsdict)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 args = b','.join(
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items()
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 cmds.append(b'%s %s' % (op, args))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b';'.join(cmds)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 class unsentfuture(futures.Future):
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 """A Future variation to represent an unsent command.
Because we buffer commands and don't submit them immediately, calling
``result()`` on an unsent future could deadlock. Futures for buffered
commands are represented by this type, which wraps ``result()`` to
call ``sendcommands()``.
"""
Matt Harbison
typing: suppress bogus pytype errors in `mercurial/wireprotov1peer.py`...
r53023 _peerexecutor: "peerexecutor"
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 def result(self, timeout=None):
if self.done():
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 return futures.Future.result(self, timeout)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
self._peerexecutor.sendcommands()
# This looks like it will infinitely recurse. However,
# sendcommands() should modify __class__. This call serves as a check
# on that.
return self.result(timeout)
Augie Fackler
formatting: blacken the codebase...
r43346
Matt Harbison
typing: suppress bogus pytype errors in `mercurial/wireprotov1peer.py`...
r53023 # @interfaceutil.implementer(repository.ipeercommandexecutor)
Gregory Szorc
py3: use class X: instead of class X(object):...
r49801 class peerexecutor:
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 def __init__(self, peer):
self._peer = peer
self._sent = False
self._closed = False
self._calls = []
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 self._futures = weakref.WeakSet()
self._responseexecutor = None
self._responsef = None
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
def __enter__(self):
return self
def __exit__(self, exctype, excvalee, exctb):
self.close()
def callcommand(self, command, args):
if self._sent:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'callcommand() cannot be used after commands are sent'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
if self._closed:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'callcommand() cannot be used after close()'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
# Commands are dispatched through methods on the peer.
fn = getattr(self._peer, pycompat.sysstr(command), None)
if not fn:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'cannot call command %s: method of same name not available '
b'on peer' % command
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
# Commands are either batchable or they aren't. If a command
# isn't batchable, we send it immediately because the executor
# can no longer accept new commands after a non-batchable command.
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # If a command is batchable, we queue it for later. But we have
# to account for the case of a non-batchable command arriving after
# a batchable one and refuse to service it.
def addcall():
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 f = futures.Future()
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 self._futures.add(f)
self._calls.append((command, args, fn, f))
return f
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
if getattr(fn, 'batchable', False):
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f = addcall()
# But since we don't issue it immediately, we wrap its result()
# to trigger sending so we avoid deadlocks.
f.__class__ = unsentfuture
f._peerexecutor = self
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 else:
if self._calls:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'%s is not batchable and cannot be called on a command '
b'executor along with other commands' % command
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f = addcall()
# Non-batchable commands can never coexist with another command
# in this executor. So send the command immediately.
self.sendcommands()
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 return f
def sendcommands(self):
if self._sent:
return
if not self._calls:
return
self._sent = True
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Unhack any future types so caller seens a clean type and to break
# cycle between us and futures.
for f in self._futures:
if isinstance(f, unsentfuture):
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 f.__class__ = futures.Future
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f._peerexecutor = None
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 calls = self._calls
# Mainly to destroy references to futures.
self._calls = None
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Simple case of a single command. We call it synchronously.
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 if len(calls) == 1:
command, args, fn, f = calls[0]
# Future was cancelled. Ignore it.
if not f.set_running_or_notify_cancel():
return
try:
result = fn(**pycompat.strkwargs(args))
except Exception:
Augie Fackler
py3: paper over differences in future exception handling...
r37687 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 else:
f.set_result(result)
return
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Batch commands are a bit harder. First, we have to deal with the
# @batchable coroutine. That's a bit annoying. Furthermore, we also
# need to preserve streaming. i.e. it should be possible for the
# futures to resolve as data is coming in off the wire without having
# to wait for the final byte of the final response. We do this by
# spinning up a thread to read the responses.
requests = []
states = []
for command, args, fn, f in calls:
# Future was cancelled. Ignore it.
if not f.set_running_or_notify_cancel():
continue
try:
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_args_or_res, decode = fn.batchable(
Augie Fackler
formatting: blacken the codebase...
r43346 fn.__self__, **pycompat.strkwargs(args)
)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 except Exception:
Augie Fackler
py3: paper over differences in future exception handling...
r37687 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 return
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 if not decode:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 f.set_result(encoded_args_or_res)
Valentin Gatien-Baron
wireproto: in batch queries, support queries with immediate responses...
r41085 else:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 requests.append((command, encoded_args_or_res))
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 states.append((command, f, batchable, decode))
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
if not requests:
return
# This will emit responses in order they were executed.
wireresults = self._peer._submitbatch(requests)
# The use of a thread pool executor here is a bit weird for something
# that only spins up a single thread. However, thread management is
# hard and it is easy to encounter race conditions, deadlocks, etc.
# concurrent.futures already solves these problems and its thread pool
# executor has minimal overhead. So we use it.
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 self._responseexecutor = futures.ThreadPoolExecutor(1)
Augie Fackler
formatting: blacken the codebase...
r43346 self._responsef = self._responseexecutor.submit(
self._readbatchresponse, states, wireresults
)
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
def close(self):
self.sendcommands()
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 if self._closed:
return
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 self._closed = True
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 if not self._responsef:
return
# We need to wait on our in-flight response and then shut down the
# executor once we have a result.
try:
self._responsef.result()
finally:
Matt Harbison
typing: suppress bogus pytype errors in `mercurial/wireprotov1peer.py`...
r53023 # Help pytype- this is initialized by self.sendcommands(), called
# above.
assert self._responseexecutor is not None
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 self._responseexecutor.shutdown(wait=True)
self._responsef = None
self._responseexecutor = None
# If any of our futures are still in progress, mark them as
# errored. Otherwise a result() could wait indefinitely.
for f in self._futures:
if not f.done():
Augie Fackler
formatting: blacken the codebase...
r43346 f.set_exception(
error.ResponseError(
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 _(b'unfulfilled batch command response'), None
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
self._futures = None
def _readbatchresponse(self, states, wireresults):
# Executes in a thread to read data off the wire.
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 for command, f, batchable, decode in states:
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Grab raw result off the wire and teach the internal future
# about it.
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 try:
remoteresult = next(wireresults)
except StopIteration:
# This can happen in particular because next(batchable)
# in the previous iteration can call peer._abort, which
# may close the peer.
f.set_exception(
error.ResponseError(
_(b'unfulfilled batch command response'), None
)
)
else:
try:
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 result = decode(remoteresult)
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 except Exception:
pycompat.future_set_exception_info(f, sys.exc_info()[1:])
else:
f.set_result(result)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
Augie Fackler
formatting: blacken the codebase...
r43346
@interfaceutil.implementer(
repository.ipeercommands, repository.ipeerlegacycommands
)
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 class wirepeer(repository.peer):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 """Client-side interface for communicating with a peer repository.
Methods commonly call wire protocol commands of the same name.
See also httppeer.py and sshpeer.py for protocol-specific
implementations of this interface.
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 def commandexecutor(self):
return peerexecutor(self)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # Begin of ipeercommands interface.
Gregory Szorc
wireproto: properly call clonebundles command...
r37667 def clonebundles(self):
clonebundles: introduce a new write protocol command...
r51594 if self.capable(b'clonebundles_manifest'):
return self._call(b'clonebundles_manifest')
else:
self.requirecap(b'clonebundles', _(b'clone bundles'))
return self._call(b'clonebundles')
Gregory Szorc
wireproto: properly call clonebundles command...
r37667
Mathias De Mare
clonebundles: add support for inline (streaming) clonebundles...
r51559 def _finish_inline_clone_bundle(self, stream):
pass # allow override for httppeer
clone-bundle: rename the methods and wireprotole command...
r51592 def get_cached_bundle_inline(self, path):
stream = self._callstream(b"get_cached_bundle_inline", path=path)
Mathias De Mare
clonebundles: add support for inline (streaming) clonebundles...
r51559 length = util.uvarintdecodestream(stream)
# SSH streams will block if reading more than length
for chunk in util.filechunkiter(stream, limit=length):
yield chunk
self._finish_inline_clone_bundle(stream)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 @batchable
def lookup(self, key):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'lookup', _(b'look up remote revision'))
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
success, data = d[:-1].split(b" ", 1)
if int(success):
return bin(data)
else:
self._abort(error.RepoError(data))
return {b'key': encoding.fromlocal(key)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def heads(self):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
return wireprototypes.decodelist(d[:-1])
except ValueError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def known(self, nodes):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
return [bool(int(b)) for b in pycompat.iterbytestr(d)]
except ValueError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {b'nodes': wireprototypes.encodelist(nodes)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def branchmap(self):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
branchmap = {}
for branchpart in d.splitlines():
branchname, branchheads = branchpart.split(b' ', 1)
branchname = encoding.tolocal(urlreq.unquote(branchname))
branchheads = wireprototypes.decodelist(branchheads)
branchmap[branchname] = branchheads
return branchmap
except TypeError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def listkeys(self, namespace):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'pushkey'):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 return {}, None
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
self.ui.debug(
b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
)
return pushkeymod.decodekeys(d)
return {b'namespace': encoding.fromlocal(namespace)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def pushkey(self, namespace, key, old, new):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'pushkey'):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 return False, None
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
d, output = d.split(b'\n', 1)
try:
d = bool(int(d))
except ValueError:
raise error.ResponseError(
_(b'push failed (unexpected response):'), d
)
for l in output.splitlines(True):
self.ui.status(_(b'remote: '), l)
return d
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'namespace': encoding.fromlocal(namespace),
b'key': encoding.fromlocal(key),
b'old': encoding.fromlocal(old),
b'new': encoding.fromlocal(new),
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 }, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def stream_out(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._callstream(b'stream_out')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def getbundle(self, source, **kwargs):
kwargs = pycompat.byteskwargs(kwargs)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'getbundle', _(b'look up remote changes'))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 opts = {}
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 bundlecaps = kwargs.get(b'bundlecaps') or set()
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for key, value in kwargs.items():
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 if value is None:
continue
keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
if keytype is None:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'Unexpectedly None keytype for key %s' % key
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 elif keytype == b'nodes':
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 value = wireprototypes.encodelist(value)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 elif keytype == b'csv':
value = b','.join(value)
elif keytype == b'scsv':
value = b','.join(sorted(value))
elif keytype == b'boolean':
value = b'%i' % bool(value)
elif keytype != b'plain':
raise KeyError(b'unknown getbundle option type %s' % keytype)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 opts[key] = value
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
if any((cap.startswith(b'HG2') for cap in bundlecaps)):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 return bundle2.getunbundler(self.ui, f)
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Gregory Szorc
wireproto: use command executor for unbundle...
r37664 def unbundle(self, bundle, heads, url):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """Send cg (a readable file-like object representing the
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 changegroup to push, typically a chunkbuffer object) to the
remote server as a bundle.
When pushing a bundle10 stream, return an integer indicating the
result of the push (see changegroup.apply()).
When pushing a bundle20 stream, return a bundle20 stream.
`url` is the url the client thinks it's pushing to, which is
visible to hooks.
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if heads != [b'force'] and self.capable(b'unbundlehash'):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 heads = wireprototypes.encodelist(
Augie Fackler
core: migrate uses of hashlib.sha1 to hashutil.sha1...
r44517 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 else:
heads = wireprototypes.encodelist(heads)
safehasattr: drop usage in favor of hasattr...
r51821 if hasattr(bundle, 'deltaheader'):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # this a bundle10, do the old style call sequence
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
if ret == b"":
raise error.ResponseError(_(b'push failed:'), output)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
ret = int(ret)
except ValueError:
raise error.ResponseError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'push failed (unexpected response):'), ret
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
for l in output.splitlines(True):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.status(_(b'remote: '), l)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 else:
# bundle2 push. Send a stream, fetch a stream.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 ret = bundle2.getunbundler(self.ui, stream)
return ret
# End of ipeercommands interface.
# Begin of ipeerlegacycommands interface.
def branches(self, nodes):
n = wireprototypes.encodelist(nodes)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 d = self._call(b"branches", nodes=n)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
return br
except ValueError:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._abort(error.ResponseError(_(b"unexpected response:"), d))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def between(self, pairs):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = 8 # avoid giant requests
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 r = []
Manuel Jacob
py3: replace `pycompat.xrange` by `range`
r50179 for i in range(0, len(pairs), batch):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 n = b" ".join(
Augie Fackler
formatting: blacken the codebase...
r43346 [
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 wireprototypes.encodelist(p, b'-')
Augie Fackler
formatting: blacken the codebase...
r43346 for p in pairs[i : i + batch]
]
)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 d = self._call(b"between", pairs=n)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
Augie Fackler
formatting: blacken the codebase...
r43346 r.extend(
l and wireprototypes.decodelist(l) or []
for l in d.splitlines()
)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 except ValueError:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._abort(error.ResponseError(_(b"unexpected response:"), d))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 return r
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 def changegroup(self, nodes, source):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 n = wireprototypes.encodelist(nodes)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 f = self._callcompressable(b"changegroup", roots=n)
return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 def changegroupsubset(self, bases, heads, source):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 bases = wireprototypes.encodelist(bases)
heads = wireprototypes.encodelist(heads)
Augie Fackler
formatting: blacken the codebase...
r43346 f = self._callcompressable(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"changegroupsubset", bases=bases, heads=heads
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
# End of ipeerlegacycommands interface.
def _submitbatch(self, req):
"""run batch request <req> on the server
Returns an iterator of the raw responses from the server.
"""
ui = self.ui
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
ui.debug(b'devel-peer-request: batched-content\n')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 for op, args in req:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 msg = b'devel-peer-request: - %s (%d arguments)\n'
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 ui.debug(msg % (op, len(args)))
unescapearg = wireprototypes.unescapebatcharg
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 chunk = rsp.read(1024)
work = [chunk]
while chunk:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 while b';' not in chunk and chunk:
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 chunk = rsp.read(1024)
work.append(chunk)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 merged = b''.join(work)
while b';' in merged:
one, merged = merged.split(b';', 1)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 yield unescapearg(one)
chunk = rsp.read(1024)
work = [merged, chunk]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 yield unescapearg(b''.join(work))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def _submitone(self, op, args):
return self._call(op, **pycompat.strkwargs(args))
def debugwireargs(self, one, two, three=None, four=None, five=None):
# don't pass optional arguments left at their default value
opts = {}
if three is not None:
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 opts['three'] = three
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 if four is not None:
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 opts['four'] = four
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._call(b'debugwireargs', one=one, two=two, **opts)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def _call(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a simple string.
returns the server reply as a string."""
raise NotImplementedError()
def _callstream(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a stream. Note that if the
command doesn't return a stream, _callstream behaves
differently for ssh and http peers.
returns the server reply as a file like object.
"""
raise NotImplementedError()
def _callcompressable(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a stream.
The stream may have been compressed in some implementations. This
function takes care of the decompression. This is the only difference
with _callstream.
returns the server reply as a file like object.
"""
raise NotImplementedError()
def _callpush(self, cmd, fp, **args):
"""execute a <cmd> on server
The command is expected to be related to a push. Push has a special
return method.
returns the server reply as a (ret, output) tuple. ret is either
empty (error) or a stringified int.
"""
raise NotImplementedError()
def _calltwowaystream(self, cmd, fp, **args):
"""execute <cmd> on server
The command will send a stream to the server and get a stream in reply.
"""
raise NotImplementedError()
def _abort(self, exception):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """clearly abort the wire protocol connection and raise the exception"""
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 raise NotImplementedError()