wireprotov2peer.py
210 lines
| 6.3 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotov2peer.py
Gregory Szorc
|
r37737 | # wireprotov2peer.py - client side code for wire protocol version 2 | ||
# | ||||
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
from .i18n import _ | ||||
from . import ( | ||||
Gregory Szorc
|
r37739 | encoding, | ||
Gregory Szorc
|
r37737 | error, | ||
util, | ||||
wireprotoframing, | ||||
) | ||||
Gregory Szorc
|
r39481 | from .utils import ( | ||
cborutil, | ||||
) | ||||
Gregory Szorc
|
r37737 | |||
Gregory Szorc
|
r37743 | def formatrichmessage(atoms): | ||
"""Format an encoded message from the framing protocol.""" | ||||
chunks = [] | ||||
for atom in atoms: | ||||
msg = _(atom[b'msg']) | ||||
if b'args' in atom: | ||||
Gregory Szorc
|
r39522 | msg = msg % tuple(atom[b'args']) | ||
Gregory Szorc
|
r37743 | |||
chunks.append(msg) | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r37738 | class commandresponse(object): | ||
"""Represents the response to a command request.""" | ||||
def __init__(self, requestid, command): | ||||
self.requestid = requestid | ||||
self.command = command | ||||
self.b = util.bytesio() | ||||
def cborobjects(self): | ||||
"""Obtain decoded CBOR objects from this response.""" | ||||
self.b.seek(0) | ||||
Gregory Szorc
|
r39481 | for v in cborutil.decodeall(self.b.getvalue()): | ||
yield v | ||||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r37737 | class clienthandler(object): | ||
"""Object to handle higher-level client activities. | ||||
The ``clientreactor`` is used to hold low-level state about the frame-based | ||||
protocol, such as which requests and streams are active. This type is used | ||||
for higher-level operations, such as reading frames from a socket, exposing | ||||
and managing a higher-level primitive for representing command responses, | ||||
etc. This class is what peers should probably use to bridge wire activity | ||||
with the higher-level peer API. | ||||
""" | ||||
def __init__(self, ui, clientreactor): | ||||
self._ui = ui | ||||
self._reactor = clientreactor | ||||
self._requests = {} | ||||
self._futures = {} | ||||
self._responses = {} | ||||
def callcommand(self, command, args, f): | ||||
"""Register a request to call a command. | ||||
Returns an iterable of frames that should be sent over the wire. | ||||
""" | ||||
request, action, meta = self._reactor.callcommand(command, args) | ||||
if action != 'noop': | ||||
raise error.ProgrammingError('%s not yet supported' % action) | ||||
rid = request.requestid | ||||
self._requests[rid] = request | ||||
self._futures[rid] = f | ||||
Gregory Szorc
|
r37738 | self._responses[rid] = commandresponse(rid, command) | ||
Gregory Szorc
|
r37737 | |||
return iter(()) | ||||
def flushcommands(self): | ||||
"""Flush all queued commands. | ||||
Returns an iterable of frames that should be sent over the wire. | ||||
""" | ||||
action, meta = self._reactor.flushcommands() | ||||
if action != 'sendframes': | ||||
raise error.ProgrammingError('%s not yet supported' % action) | ||||
return meta['framegen'] | ||||
def readframe(self, fh): | ||||
"""Attempt to read and process a frame. | ||||
Returns None if no frame was read. Presumably this means EOF. | ||||
""" | ||||
frame = wireprotoframing.readframe(fh) | ||||
if frame is None: | ||||
# TODO tell reactor? | ||||
return | ||||
self._ui.note(_('received %r\n') % frame) | ||||
self._processframe(frame) | ||||
return True | ||||
def _processframe(self, frame): | ||||
"""Process a single read frame.""" | ||||
action, meta = self._reactor.onframerecv(frame) | ||||
if action == 'error': | ||||
e = error.RepoError(meta['message']) | ||||
if frame.requestid in self._futures: | ||||
self._futures[frame.requestid].set_exception(e) | ||||
else: | ||||
raise e | ||||
if frame.requestid not in self._requests: | ||||
raise error.ProgrammingError( | ||||
'received frame for unknown request; this is either a bug in ' | ||||
'the clientreactor not screening for this or this instance was ' | ||||
'never told about this request: %r' % frame) | ||||
response = self._responses[frame.requestid] | ||||
if action == 'responsedata': | ||||
Gregory Szorc
|
r39521 | # Any failures processing this frame should bubble up to the | ||
# future tracking the request. | ||||
try: | ||||
self._processresponsedata(frame, meta, response) | ||||
except BaseException as e: | ||||
self._futures[frame.requestid].set_exception(e) | ||||
Gregory Szorc
|
r37737 | else: | ||
raise error.ProgrammingError( | ||||
'unhandled action from clientreactor: %s' % action) | ||||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r39469 | def _processresponsedata(self, frame, meta, response): | ||
# This buffers all data until end of stream is received. This | ||||
# is bad for performance. | ||||
# TODO make response data streamable | ||||
response.b.write(meta['data']) | ||||
if meta['eos']: | ||||
# If the command has a decoder, resolve the future to the | ||||
# decoded value. Otherwise resolve to the rich response object. | ||||
decoder = COMMAND_DECODERS.get(response.command) | ||||
# TODO consider always resolving the overall status map. | ||||
if decoder: | ||||
objs = response.cborobjects() | ||||
overall = next(objs) | ||||
if overall['status'] == 'ok': | ||||
self._futures[frame.requestid].set_result(decoder(objs)) | ||||
else: | ||||
Gregory Szorc
|
r39522 | atoms = [{'msg': overall['error']['message']}] | ||
if 'args' in overall['error']: | ||||
atoms[0]['args'] = overall['error']['args'] | ||||
e = error.RepoError(formatrichmessage(atoms)) | ||||
Gregory Szorc
|
r39469 | self._futures[frame.requestid].set_exception(e) | ||
else: | ||||
self._futures[frame.requestid].set_result(response) | ||||
del self._requests[frame.requestid] | ||||
del self._futures[frame.requestid] | ||||
Gregory Szorc
|
r37743 | def decodebranchmap(objs): | ||
Gregory Szorc
|
r37739 | # Response should be a single CBOR map of branch name to array of nodes. | ||
Gregory Szorc
|
r37743 | bm = next(objs) | ||
Gregory Szorc
|
r37739 | |||
return {encoding.tolocal(k): v for k, v in bm.items()} | ||||
Gregory Szorc
|
r37743 | def decodeheads(objs): | ||
Gregory Szorc
|
r37739 | # Array of node bytestrings. | ||
Gregory Szorc
|
r37743 | return next(objs) | ||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodeknown(objs): | ||
Gregory Szorc
|
r37739 | # Bytestring where each byte is a 0 or 1. | ||
Gregory Szorc
|
r37743 | raw = next(objs) | ||
Gregory Szorc
|
r37739 | |||
return [True if c == '1' else False for c in raw] | ||||
Gregory Szorc
|
r37743 | def decodelistkeys(objs): | ||
Gregory Szorc
|
r37739 | # Map with bytestring keys and values. | ||
Gregory Szorc
|
r37743 | return next(objs) | ||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodelookup(objs): | ||
return next(objs) | ||||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodepushkey(objs): | ||
return next(objs) | ||||
Gregory Szorc
|
r37739 | |||
COMMAND_DECODERS = { | ||||
'branchmap': decodebranchmap, | ||||
'heads': decodeheads, | ||||
'known': decodeknown, | ||||
'listkeys': decodelistkeys, | ||||
'lookup': decodelookup, | ||||
'pushkey': decodepushkey, | ||||
} | ||||