diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -13,7 +13,6 @@ import io import os import socket import struct -import sys import tempfile import weakref @@ -36,6 +35,7 @@ from . import ( wireprotoframing, wireprototypes, wireprotov1peer, + wireprotov2peer, wireprotov2server, ) @@ -522,6 +522,8 @@ def sendv2request(ui, opener, requestbui reactor = wireprotoframing.clientreactor(hasmultiplesend=False, buffersends=True) + handler = wireprotov2peer.clienthandler(ui, reactor) + url = '%s/%s' % (apiurl, permission) if len(requests) > 1: @@ -529,20 +531,11 @@ def sendv2request(ui, opener, requestbui else: url += '/%s' % requests[0][0] - # Request ID to (request, future) - requestmap = {} - for command, args, f in requests: - request, action, meta = reactor.callcommand(command, args) - assert action == 'noop' - - requestmap[request.requestid] = (request, f) - - action, meta = reactor.flushcommands() - assert action == 'sendframes' + assert not list(handler.callcommand(command, args, f)) # TODO stream this. - body = b''.join(map(bytes, meta['framegen'])) + body = b''.join(map(bytes, handler.flushcommands())) # TODO modify user-agent to reflect v2 headers = { @@ -564,7 +557,7 @@ def sendv2request(ui, opener, requestbui ui.traceback() raise IOError(None, e) - return reactor, requestmap, res + return handler, res class queuedcommandfuture(pycompat.futures.Future): """Wraps result() on command futures to trigger submission on call.""" @@ -684,7 +677,7 @@ class httpv2executor(object): 'pull': 'ro', }[permissions.pop()] - reactor, requests, resp = sendv2request( + handler, resp = sendv2request( self._ui, self._opener, self._requestbuilder, self._apiurl, permission, calls) @@ -692,9 +685,7 @@ class httpv2executor(object): self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) self._responsef = self._responseexecutor.submit(self._handleresponse, - reactor, - requests, - resp) + handler, resp) def close(self): if self._closed: @@ -723,62 +714,11 @@ class httpv2executor(object): self._futures = None - def _handleresponse(self, reactor, requests, resp): + def _handleresponse(self, handler, resp): # Called in a thread to read the response. - results = {k: [] for k in requests} - - while True: - frame = wireprotoframing.readframe(resp) - if frame is None: - break - - self._ui.note(_('received %r\n') % frame) - - # Guard against receiving a frame with a request ID that we - # didn't issue. This should never happen. - request, f = requests.get(frame.requestid, [None, None]) - - action, meta = reactor.onframerecv(frame) - - if action == 'responsedata': - assert request.requestid == meta['request'].requestid - - result = results[request.requestid] - - if meta['cbor']: - payload = util.bytesio(meta['data']) - - decoder = cbor.CBORDecoder(payload) - while payload.tell() + 1 < len(meta['data']): - try: - result.append(decoder.decode()) - except Exception: - pycompat.future_set_exception_info( - f, sys.exc_info()[1:]) - continue - else: - result.append(meta['data']) - - if meta['eos']: - f.set_result(result) - del results[request.requestid] - - elif action == 'error': - e = error.RepoError(meta['message']) - - if f: - f.set_exception(e) - else: - raise e - - else: - e = error.ProgrammingError('unhandled action: %s' % action) - - if f: - f.set_exception(e) - else: - raise e + while handler.readframe(resp): + pass # TODO implement interface for version 2 peers @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py new file mode 100644 --- /dev/null +++ b/mercurial/wireprotov2peer.py @@ -0,0 +1,135 @@ +# wireprotov2peer.py - client side code for wire protocol version 2 +# +# Copyright 2018 Gregory Szorc +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +from __future__ import absolute_import + +from .i18n import _ +from .thirdparty import ( + cbor, +) +from . import ( + error, + util, + wireprotoframing, +) + +class clienthandler(object): + """Object to handle higher-level client activities. + + The ``clientreactor`` is used to hold low-level state about the frame-based + protocol, such as which requests and streams are active. This type is used + for higher-level operations, such as reading frames from a socket, exposing + and managing a higher-level primitive for representing command responses, + etc. This class is what peers should probably use to bridge wire activity + with the higher-level peer API. + """ + + def __init__(self, ui, clientreactor): + self._ui = ui + self._reactor = clientreactor + self._requests = {} + self._futures = {} + self._responses = {} + + def callcommand(self, command, args, f): + """Register a request to call a command. + + Returns an iterable of frames that should be sent over the wire. + """ + request, action, meta = self._reactor.callcommand(command, args) + + if action != 'noop': + raise error.ProgrammingError('%s not yet supported' % action) + + rid = request.requestid + self._requests[rid] = request + self._futures[rid] = f + self._responses[rid] = { + 'cbor': False, + 'b': util.bytesio(), + } + + return iter(()) + + def flushcommands(self): + """Flush all queued commands. + + Returns an iterable of frames that should be sent over the wire. + """ + action, meta = self._reactor.flushcommands() + + if action != 'sendframes': + raise error.ProgrammingError('%s not yet supported' % action) + + return meta['framegen'] + + def readframe(self, fh): + """Attempt to read and process a frame. + + Returns None if no frame was read. Presumably this means EOF. + """ + frame = wireprotoframing.readframe(fh) + if frame is None: + # TODO tell reactor? + return + + self._ui.note(_('received %r\n') % frame) + self._processframe(frame) + + return True + + def _processframe(self, frame): + """Process a single read frame.""" + + action, meta = self._reactor.onframerecv(frame) + + if action == 'error': + e = error.RepoError(meta['message']) + + if frame.requestid in self._futures: + self._futures[frame.requestid].set_exception(e) + else: + raise e + + if frame.requestid not in self._requests: + raise error.ProgrammingError( + 'received frame for unknown request; this is either a bug in ' + 'the clientreactor not screening for this or this instance was ' + 'never told about this request: %r' % frame) + + response = self._responses[frame.requestid] + + if action == 'responsedata': + response['b'].write(meta['data']) + + if meta['cbor']: + response['cbor'] = True + + if meta['eos']: + if meta['cbor']: + # If CBOR, decode every object. + b = response['b'] + + size = b.tell() + b.seek(0) + + decoder = cbor.CBORDecoder(b) + + result = [] + while b.tell() < size: + result.append(decoder.decode()) + else: + result = [response['b'].getvalue()] + + self._futures[frame.requestid].set_result(result) + + del self._requests[frame.requestid] + del self._futures[frame.requestid] + + else: + raise error.ProgrammingError( + 'unhandled action from clientreactor: %s' % action) diff --git a/tests/test-wireproto-command-known.t b/tests/test-wireproto-command-known.t --- a/tests/test-wireproto-command-known.t +++ b/tests/test-wireproto-command-known.t @@ -50,7 +50,7 @@ No arguments returns something reasonabl received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) s> 0\r\n s> \r\n - response: [] + response: [b''] Single known node works diff --git a/tests/test-wireproto-command-pushkey.t b/tests/test-wireproto-command-pushkey.t --- a/tests/test-wireproto-command-pushkey.t +++ b/tests/test-wireproto-command-pushkey.t @@ -53,7 +53,7 @@ pushkey for a bookmark works received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob) s> 0\r\n s> \r\n - response: [] + response: [True] $ sendhttpv2peer << EOF > command listkeys