##// END OF EJS Templates
exchange: fix locking to actually be scoped...
exchange: fix locking to actually be scoped The previous code was taking locks before entering with statements, so exception before the with statement would not release the lock (except for garbage collection).

File last commit:

r51822:18c8c189 default
r52526:429d5722 stable
Show More
wireprotov1peer.py
660 lines | 21.9 KiB | text/x-python | PythonLexer
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
#
Raphaël Gomès
contributor: change mentions of mpm to olivia...
r47575 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 #
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 import sys
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 import weakref
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 from concurrent import futures
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 from .i18n import _
Augie Fackler
formatting: blacken the codebase...
r43346 from .node import bin
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 from . import (
bundle2,
changegroup as changegroupmod,
encoding,
error,
pushkey as pushkeymod,
pycompat,
util,
wireprototypes,
)
Pulkit Goyal
interfaces: create a new folder for interfaces and move repository.py in it...
r43078 from .interfaces import (
repository,
Pulkit Goyal
interfaceutil: move to interfaces/...
r43079 util as interfaceutil,
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 )
Augie Fackler
core: migrate uses of hashlib.sha1 to hashutil.sha1...
r44517 from .utils import hashutil
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
urlreq = util.urlreq
Augie Fackler
formatting: blacken the codebase...
r43346
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def batchable(f):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """annotation for batchable methods
Gregory Szorc
peer: scatter module to the wind (API)...
r37633
Such methods must implement a coroutine as follows:
@batchable
def sample(self, one, two=None):
# Build list of encoded arguments suitable for your wire protocol:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 encoded_args = [('one', encode(one),), ('two', encode(two),)]
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 # Return it, along with a function that will receive the result
# from the batched request.
return encoded_args, decode
Gregory Szorc
peer: scatter module to the wind (API)...
r37633
The decorator returns a function which wraps this coroutine as a plain
method, but adds the original method as an attribute called "batchable",
which is used by remotebatch to split the call into separate encoding and
decoding phases.
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 def plain(*args, **opts):
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_args_or_res, decode = f(*args, **opts)
if not decode:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 return encoded_args_or_res # a local result in this case
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 self = args[0]
cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_res = self._submitone(cmd, encoded_args_or_res)
return decode(encoded_res)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 setattr(plain, 'batchable', f)
Augie Fackler
wireprotov1peer: forward __name__ of wrapped method in batchable decorator...
r39624 setattr(plain, '__name__', f.__name__)
Gregory Szorc
peer: scatter module to the wind (API)...
r37633 return plain
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 def encodebatchcmds(req):
"""Return a ``cmds`` argument value for the ``batch`` command."""
escapearg = wireprototypes.escapebatcharg
cmds = []
for op, argsdict in req:
# Old servers didn't properly unescape argument names. So prevent
# the sending of argument names that may not be decoded properly by
# servers.
assert all(escapearg(k) == k for k in argsdict)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 args = b','.join(
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 b'%s=%s' % (escapearg(k), escapearg(v)) for k, v in argsdict.items()
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 cmds.append(b'%s %s' % (op, args))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return b';'.join(cmds)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 class unsentfuture(futures.Future):
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 """A Future variation to represent an unsent command.
Because we buffer commands and don't submit them immediately, calling
``result()`` on an unsent future could deadlock. Futures for buffered
commands are represented by this type, which wraps ``result()`` to
call ``sendcommands()``.
"""
def result(self, timeout=None):
if self.done():
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 return futures.Future.result(self, timeout)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
self._peerexecutor.sendcommands()
# This looks like it will infinitely recurse. However,
# sendcommands() should modify __class__. This call serves as a check
# on that.
return self.result(timeout)
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 @interfaceutil.implementer(repository.ipeercommandexecutor)
Gregory Szorc
py3: use class X: instead of class X(object):...
r49801 class peerexecutor:
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 def __init__(self, peer):
self._peer = peer
self._sent = False
self._closed = False
self._calls = []
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 self._futures = weakref.WeakSet()
self._responseexecutor = None
self._responsef = None
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
def __enter__(self):
return self
def __exit__(self, exctype, excvalee, exctb):
self.close()
def callcommand(self, command, args):
if self._sent:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'callcommand() cannot be used after commands are sent'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
if self._closed:
Augie Fackler
formatting: blacken the codebase...
r43346 raise error.ProgrammingError(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b'callcommand() cannot be used after close()'
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
# Commands are dispatched through methods on the peer.
fn = getattr(self._peer, pycompat.sysstr(command), None)
if not fn:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'cannot call command %s: method of same name not available '
b'on peer' % command
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
# Commands are either batchable or they aren't. If a command
# isn't batchable, we send it immediately because the executor
# can no longer accept new commands after a non-batchable command.
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # If a command is batchable, we queue it for later. But we have
# to account for the case of a non-batchable command arriving after
# a batchable one and refuse to service it.
def addcall():
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 f = futures.Future()
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 self._futures.add(f)
self._calls.append((command, args, fn, f))
return f
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
if getattr(fn, 'batchable', False):
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f = addcall()
# But since we don't issue it immediately, we wrap its result()
# to trigger sending so we avoid deadlocks.
f.__class__ = unsentfuture
f._peerexecutor = self
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 else:
if self._calls:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'%s is not batchable and cannot be called on a command '
b'executor along with other commands' % command
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f = addcall()
# Non-batchable commands can never coexist with another command
# in this executor. So send the command immediately.
self.sendcommands()
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 return f
def sendcommands(self):
if self._sent:
return
if not self._calls:
return
self._sent = True
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Unhack any future types so caller seens a clean type and to break
# cycle between us and futures.
for f in self._futures:
if isinstance(f, unsentfuture):
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 f.__class__ = futures.Future
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 f._peerexecutor = None
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 calls = self._calls
# Mainly to destroy references to futures.
self._calls = None
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Simple case of a single command. We call it synchronously.
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 if len(calls) == 1:
command, args, fn, f = calls[0]
# Future was cancelled. Ignore it.
if not f.set_running_or_notify_cancel():
return
try:
result = fn(**pycompat.strkwargs(args))
except Exception:
Augie Fackler
py3: paper over differences in future exception handling...
r37687 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 else:
f.set_result(result)
return
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Batch commands are a bit harder. First, we have to deal with the
# @batchable coroutine. That's a bit annoying. Furthermore, we also
# need to preserve streaming. i.e. it should be possible for the
# futures to resolve as data is coming in off the wire without having
# to wait for the final byte of the final response. We do this by
# spinning up a thread to read the responses.
requests = []
states = []
for command, args, fn, f in calls:
# Future was cancelled. Ignore it.
if not f.set_running_or_notify_cancel():
continue
try:
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 encoded_args_or_res, decode = fn.batchable(
Augie Fackler
formatting: blacken the codebase...
r43346 fn.__self__, **pycompat.strkwargs(args)
)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 except Exception:
Augie Fackler
py3: paper over differences in future exception handling...
r37687 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 return
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 if not decode:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 f.set_result(encoded_args_or_res)
Valentin Gatien-Baron
wireproto: in batch queries, support queries with immediate responses...
r41085 else:
Martin von Zweigbergk
wireprotopeer: clarify some variable names now that we allow snake_case...
r47209 requests.append((command, encoded_args_or_res))
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 states.append((command, f, batchable, decode))
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
if not requests:
return
# This will emit responses in order they were executed.
wireresults = self._peer._submitbatch(requests)
# The use of a thread pool executor here is a bit weird for something
# that only spins up a single thread. However, thread management is
# hard and it is easy to encounter race conditions, deadlocks, etc.
# concurrent.futures already solves these problems and its thread pool
# executor has minimal overhead. So we use it.
Augie Fackler
cleanup: directly use concurrent.futures instead of via pycompat...
r49690 self._responseexecutor = futures.ThreadPoolExecutor(1)
Augie Fackler
formatting: blacken the codebase...
r43346 self._responsef = self._responseexecutor.submit(
self._readbatchresponse, states, wireresults
)
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648
def close(self):
self.sendcommands()
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 if self._closed:
return
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 self._closed = True
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 if not self._responsef:
return
# We need to wait on our in-flight response and then shut down the
# executor once we have a result.
try:
self._responsef.result()
finally:
self._responseexecutor.shutdown(wait=True)
self._responsef = None
self._responseexecutor = None
# If any of our futures are still in progress, mark them as
# errored. Otherwise a result() could wait indefinitely.
for f in self._futures:
if not f.done():
Augie Fackler
formatting: blacken the codebase...
r43346 f.set_exception(
error.ResponseError(
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 _(b'unfulfilled batch command response'), None
Augie Fackler
formatting: blacken the codebase...
r43346 )
)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
self._futures = None
def _readbatchresponse(self, states, wireresults):
# Executes in a thread to read data off the wire.
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 for command, f, batchable, decode in states:
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649 # Grab raw result off the wire and teach the internal future
# about it.
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 try:
remoteresult = next(wireresults)
except StopIteration:
# This can happen in particular because next(batchable)
# in the previous iteration can call peer._abort, which
# may close the peer.
f.set_exception(
error.ResponseError(
_(b'unfulfilled batch command response'), None
)
)
else:
try:
Valentin Gatien-Baron
wireprotov1peer: simplify the way batchable rpcs are defined...
r48686 result = decode(remoteresult)
Valentin Gatien-Baron
wireprotov1peer: don't raise internal errors in some cases...
r47430 except Exception:
pycompat.future_set_exception_info(f, sys.exc_info()[1:])
else:
f.set_result(result)
Gregory Szorc
wireproto: implement batching on peer executor interface...
r37649
Augie Fackler
formatting: blacken the codebase...
r43346
@interfaceutil.implementer(
repository.ipeercommands, repository.ipeerlegacycommands
)
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 class wirepeer(repository.peer):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 """Client-side interface for communicating with a peer repository.
Methods commonly call wire protocol commands of the same name.
See also httppeer.py and sshpeer.py for protocol-specific
implementations of this interface.
"""
Augie Fackler
formatting: blacken the codebase...
r43346
Gregory Szorc
wireproto: implement command executor interface for version 1 peers...
r37648 def commandexecutor(self):
return peerexecutor(self)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # Begin of ipeercommands interface.
Gregory Szorc
wireproto: properly call clonebundles command...
r37667 def clonebundles(self):
clonebundles: introduce a new write protocol command...
r51594 if self.capable(b'clonebundles_manifest'):
return self._call(b'clonebundles_manifest')
else:
self.requirecap(b'clonebundles', _(b'clone bundles'))
return self._call(b'clonebundles')
Gregory Szorc
wireproto: properly call clonebundles command...
r37667
Mathias De Mare
clonebundles: add support for inline (streaming) clonebundles...
r51559 def _finish_inline_clone_bundle(self, stream):
pass # allow override for httppeer
clone-bundle: rename the methods and wireprotole command...
r51592 def get_cached_bundle_inline(self, path):
stream = self._callstream(b"get_cached_bundle_inline", path=path)
Mathias De Mare
clonebundles: add support for inline (streaming) clonebundles...
r51559 length = util.uvarintdecodestream(stream)
# SSH streams will block if reading more than length
for chunk in util.filechunkiter(stream, limit=length):
yield chunk
self._finish_inline_clone_bundle(stream)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 @batchable
def lookup(self, key):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'lookup', _(b'look up remote revision'))
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
success, data = d[:-1].split(b" ", 1)
if int(success):
return bin(data)
else:
self._abort(error.RepoError(data))
return {b'key': encoding.fromlocal(key)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def heads(self):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
return wireprototypes.decodelist(d[:-1])
except ValueError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def known(self, nodes):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
return [bool(int(b)) for b in pycompat.iterbytestr(d)]
except ValueError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {b'nodes': wireprototypes.encodelist(nodes)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def branchmap(self):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 def decode(d):
try:
branchmap = {}
for branchpart in d.splitlines():
branchname, branchheads = branchpart.split(b' ', 1)
branchname = encoding.tolocal(urlreq.unquote(branchname))
branchheads = wireprototypes.decodelist(branchheads)
branchmap[branchname] = branchheads
return branchmap
except TypeError:
self._abort(error.ResponseError(_(b"unexpected response:"), d))
return {}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def listkeys(self, namespace):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'pushkey'):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 return {}, None
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
self.ui.debug(
b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
)
return pushkeymod.decodekeys(d)
return {b'namespace': encoding.fromlocal(namespace)}, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
@batchable
def pushkey(self, namespace, key, old, new):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if not self.capable(b'pushkey'):
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 return False, None
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687
def decode(d):
d, output = d.split(b'\n', 1)
try:
d = bool(int(d))
except ValueError:
raise error.ResponseError(
_(b'push failed (unexpected response):'), d
)
for l in output.splitlines(True):
self.ui.status(_(b'remote: '), l)
return d
return {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'namespace': encoding.fromlocal(namespace),
b'key': encoding.fromlocal(key),
b'old': encoding.fromlocal(old),
b'new': encoding.fromlocal(new),
Valentin Gatien-Baron
wireprotov1peer: update all rpcs to use the new batchable scheme...
r48687 }, decode
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def stream_out(self):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._callstream(b'stream_out')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def getbundle(self, source, **kwargs):
kwargs = pycompat.byteskwargs(kwargs)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'getbundle', _(b'look up remote changes'))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 opts = {}
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 bundlecaps = kwargs.get(b'bundlecaps') or set()
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for key, value in kwargs.items():
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 if value is None:
continue
keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
if keytype is None:
raise error.ProgrammingError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'Unexpectedly None keytype for key %s' % key
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 elif keytype == b'nodes':
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 value = wireprototypes.encodelist(value)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 elif keytype == b'csv':
value = b','.join(value)
elif keytype == b'scsv':
value = b','.join(sorted(value))
elif keytype == b'boolean':
value = b'%i' % bool(value)
elif keytype != b'plain':
raise KeyError(b'unknown getbundle option type %s' % keytype)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 opts[key] = value
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
if any((cap.startswith(b'HG2') for cap in bundlecaps)):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 return bundle2.getunbundler(self.ui, f)
else:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Gregory Szorc
wireproto: use command executor for unbundle...
r37664 def unbundle(self, bundle, heads, url):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """Send cg (a readable file-like object representing the
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 changegroup to push, typically a chunkbuffer object) to the
remote server as a bundle.
When pushing a bundle10 stream, return an integer indicating the
result of the push (see changegroup.apply()).
When pushing a bundle20 stream, return a bundle20 stream.
`url` is the url the client thinks it's pushing to, which is
visible to hooks.
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if heads != [b'force'] and self.capable(b'unbundlehash'):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 heads = wireprototypes.encodelist(
Augie Fackler
core: migrate uses of hashlib.sha1 to hashutil.sha1...
r44517 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 else:
heads = wireprototypes.encodelist(heads)
safehasattr: drop usage in favor of hasattr...
r51821 if hasattr(bundle, 'deltaheader'):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 # this a bundle10, do the old style call sequence
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
if ret == b"":
raise error.ResponseError(_(b'push failed:'), output)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
ret = int(ret)
except ValueError:
raise error.ResponseError(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 _(b'push failed (unexpected response):'), ret
Augie Fackler
formatting: blacken the codebase...
r43346 )
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
for l in output.splitlines(True):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.ui.status(_(b'remote: '), l)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 else:
# bundle2 push. Send a stream, fetch a stream.
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 ret = bundle2.getunbundler(self.ui, stream)
return ret
# End of ipeercommands interface.
# Begin of ipeerlegacycommands interface.
def branches(self, nodes):
n = wireprototypes.encodelist(nodes)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 d = self._call(b"branches", nodes=n)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
return br
except ValueError:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._abort(error.ResponseError(_(b"unexpected response:"), d))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def between(self, pairs):
Augie Fackler
formatting: blacken the codebase...
r43346 batch = 8 # avoid giant requests
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 r = []
Manuel Jacob
py3: replace `pycompat.xrange` by `range`
r50179 for i in range(0, len(pairs), batch):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 n = b" ".join(
Augie Fackler
formatting: blacken the codebase...
r43346 [
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 wireprototypes.encodelist(p, b'-')
Augie Fackler
formatting: blacken the codebase...
r43346 for p in pairs[i : i + batch]
]
)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 d = self._call(b"between", pairs=n)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 try:
Augie Fackler
formatting: blacken the codebase...
r43346 r.extend(
l and wireprototypes.decodelist(l) or []
for l in d.splitlines()
)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 except ValueError:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self._abort(error.ResponseError(_(b"unexpected response:"), d))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 return r
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 def changegroup(self, nodes, source):
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 n = wireprototypes.encodelist(nodes)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 f = self._callcompressable(b"changegroup", roots=n)
return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
Gregory Szorc
wireproto: convert legacy commands to command executor...
r37653 def changegroupsubset(self, bases, heads, source):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 bases = wireprototypes.encodelist(bases)
heads = wireprototypes.encodelist(heads)
Augie Fackler
formatting: blacken the codebase...
r43346 f = self._callcompressable(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"changegroupsubset", bases=bases, heads=heads
Augie Fackler
formatting: blacken the codebase...
r43346 )
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return changegroupmod.cg1unpacker(f, b'UN')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
# End of ipeerlegacycommands interface.
def _submitbatch(self, req):
"""run batch request <req> on the server
Returns an iterator of the raw responses from the server.
"""
ui = self.ui
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
ui.debug(b'devel-peer-request: batched-content\n')
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 for op, args in req:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 msg = b'devel-peer-request: - %s (%d arguments)\n'
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 ui.debug(msg % (op, len(args)))
unescapearg = wireprototypes.unescapebatcharg
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 chunk = rsp.read(1024)
work = [chunk]
while chunk:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 while b';' not in chunk and chunk:
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 chunk = rsp.read(1024)
work.append(chunk)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 merged = b''.join(work)
while b';' in merged:
one, merged = merged.split(b';', 1)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 yield unescapearg(one)
chunk = rsp.read(1024)
work = [merged, chunk]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 yield unescapearg(b''.join(work))
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def _submitone(self, op, args):
return self._call(op, **pycompat.strkwargs(args))
def debugwireargs(self, one, two, three=None, four=None, five=None):
# don't pass optional arguments left at their default value
opts = {}
if three is not None:
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 opts['three'] = three
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 if four is not None:
Augie Fackler
cleanup: remove pointless r-prefixes on single-quoted strings...
r43906 opts['four'] = four
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 return self._call(b'debugwireargs', one=one, two=two, **opts)
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632
def _call(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a simple string.
returns the server reply as a string."""
raise NotImplementedError()
def _callstream(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a stream. Note that if the
command doesn't return a stream, _callstream behaves
differently for ssh and http peers.
returns the server reply as a file like object.
"""
raise NotImplementedError()
def _callcompressable(self, cmd, **args):
"""execute <cmd> on the server
The command is expected to return a stream.
The stream may have been compressed in some implementations. This
function takes care of the decompression. This is the only difference
with _callstream.
returns the server reply as a file like object.
"""
raise NotImplementedError()
def _callpush(self, cmd, fp, **args):
"""execute a <cmd> on server
The command is expected to be related to a push. Push has a special
return method.
returns the server reply as a (ret, output) tuple. ret is either
empty (error) or a stringified int.
"""
raise NotImplementedError()
def _calltwowaystream(self, cmd, fp, **args):
"""execute <cmd> on server
The command will send a stream to the server and get a stream in reply.
"""
raise NotImplementedError()
def _abort(self, exception):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """clearly abort the wire protocol connection and raise the exception"""
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 raise NotImplementedError()