##// END OF EJS Templates
wireprotov2: establish a type for representing command response...
wireprotov2: establish a type for representing command response It will be desirable to have a higher-level type for representing command responses. This will allow us to do nicer things. For now, the instance encapsulates existing logic. It is still a bit primitive. But we're slowly making things better. Version 1 protocols have a wrapping layer that decodes the raw string data into a data structure and that data structure is sent to the future. Version 2 doesn't yet have this layer and the future is receiving the raw wire response. Hence why debugcommands needed to be taught about the response type. Differential Revision: https://phab.mercurial-scm.org/D3380

File last commit:

r37738:d715a850 default
r37738:d715a850 default
Show More
wireprotov2peer.py
137 lines | 4.1 KiB | text/x-python | PythonLexer
# wireprotov2peer.py - client side code for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
from .i18n import _
from .thirdparty import (
cbor,
)
from . import (
error,
util,
wireprotoframing,
)
class commandresponse(object):
"""Represents the response to a command request."""
def __init__(self, requestid, command):
self.requestid = requestid
self.command = command
self.cbor = False
self.b = util.bytesio()
def cborobjects(self):
"""Obtain decoded CBOR objects from this response."""
size = self.b.tell()
self.b.seek(0)
decoder = cbor.CBORDecoder(self.b)
while self.b.tell() < size:
yield decoder.decode()
class clienthandler(object):
"""Object to handle higher-level client activities.
The ``clientreactor`` is used to hold low-level state about the frame-based
protocol, such as which requests and streams are active. This type is used
for higher-level operations, such as reading frames from a socket, exposing
and managing a higher-level primitive for representing command responses,
etc. This class is what peers should probably use to bridge wire activity
with the higher-level peer API.
"""
def __init__(self, ui, clientreactor):
self._ui = ui
self._reactor = clientreactor
self._requests = {}
self._futures = {}
self._responses = {}
def callcommand(self, command, args, f):
"""Register a request to call a command.
Returns an iterable of frames that should be sent over the wire.
"""
request, action, meta = self._reactor.callcommand(command, args)
if action != 'noop':
raise error.ProgrammingError('%s not yet supported' % action)
rid = request.requestid
self._requests[rid] = request
self._futures[rid] = f
self._responses[rid] = commandresponse(rid, command)
return iter(())
def flushcommands(self):
"""Flush all queued commands.
Returns an iterable of frames that should be sent over the wire.
"""
action, meta = self._reactor.flushcommands()
if action != 'sendframes':
raise error.ProgrammingError('%s not yet supported' % action)
return meta['framegen']
def readframe(self, fh):
"""Attempt to read and process a frame.
Returns None if no frame was read. Presumably this means EOF.
"""
frame = wireprotoframing.readframe(fh)
if frame is None:
# TODO tell reactor?
return
self._ui.note(_('received %r\n') % frame)
self._processframe(frame)
return True
def _processframe(self, frame):
"""Process a single read frame."""
action, meta = self._reactor.onframerecv(frame)
if action == 'error':
e = error.RepoError(meta['message'])
if frame.requestid in self._futures:
self._futures[frame.requestid].set_exception(e)
else:
raise e
if frame.requestid not in self._requests:
raise error.ProgrammingError(
'received frame for unknown request; this is either a bug in '
'the clientreactor not screening for this or this instance was '
'never told about this request: %r' % frame)
response = self._responses[frame.requestid]
if action == 'responsedata':
response.b.write(meta['data'])
if meta['cbor']:
response.cbor = True
if meta['eos']:
self._futures[frame.requestid].set_result(response)
del self._requests[frame.requestid]
del self._futures[frame.requestid]
else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)