##// END OF EJS Templates
narrow: remove hack to write narrowspec to shared .hg directory...
narrow: remove hack to write narrowspec to shared .hg directory AFAIK, we no longer need it since the narrowspec file was move to the store directory in 576eef1ab43d, "narrow: move .hg/narrowspec to .hg/store/narrowspec."

File last commit:

r39522:43d92d68 default
r39593:623081f2 default
Show More
wireprotov2peer.py
210 lines | 6.3 KiB | text/x-python | PythonLexer
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 # wireprotov2peer.py - client side code for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
from .i18n import _
from . import (
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 encoding,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 error,
util,
wireprotoframing,
)
Gregory Szorc
wireprotov2peer: use our CBOR decoder...
r39481 from .utils import (
cborutil,
)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def formatrichmessage(atoms):
"""Format an encoded message from the framing protocol."""
chunks = []
for atom in atoms:
msg = _(atom[b'msg'])
if b'args' in atom:
Gregory Szorc
wireprotov2peer: properly format errors...
r39522 msg = msg % tuple(atom[b'args'])
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743
chunks.append(msg)
return b''.join(chunks)
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738 class commandresponse(object):
"""Represents the response to a command request."""
def __init__(self, requestid, command):
self.requestid = requestid
self.command = command
self.b = util.bytesio()
def cborobjects(self):
"""Obtain decoded CBOR objects from this response."""
self.b.seek(0)
Gregory Szorc
wireprotov2peer: use our CBOR decoder...
r39481 for v in cborutil.decodeall(self.b.getvalue()):
yield v
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 class clienthandler(object):
"""Object to handle higher-level client activities.
The ``clientreactor`` is used to hold low-level state about the frame-based
protocol, such as which requests and streams are active. This type is used
for higher-level operations, such as reading frames from a socket, exposing
and managing a higher-level primitive for representing command responses,
etc. This class is what peers should probably use to bridge wire activity
with the higher-level peer API.
"""
def __init__(self, ui, clientreactor):
self._ui = ui
self._reactor = clientreactor
self._requests = {}
self._futures = {}
self._responses = {}
def callcommand(self, command, args, f):
"""Register a request to call a command.
Returns an iterable of frames that should be sent over the wire.
"""
request, action, meta = self._reactor.callcommand(command, args)
if action != 'noop':
raise error.ProgrammingError('%s not yet supported' % action)
rid = request.requestid
self._requests[rid] = request
self._futures[rid] = f
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738 self._responses[rid] = commandresponse(rid, command)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
return iter(())
def flushcommands(self):
"""Flush all queued commands.
Returns an iterable of frames that should be sent over the wire.
"""
action, meta = self._reactor.flushcommands()
if action != 'sendframes':
raise error.ProgrammingError('%s not yet supported' % action)
return meta['framegen']
def readframe(self, fh):
"""Attempt to read and process a frame.
Returns None if no frame was read. Presumably this means EOF.
"""
frame = wireprotoframing.readframe(fh)
if frame is None:
# TODO tell reactor?
return
self._ui.note(_('received %r\n') % frame)
self._processframe(frame)
return True
def _processframe(self, frame):
"""Process a single read frame."""
action, meta = self._reactor.onframerecv(frame)
if action == 'error':
e = error.RepoError(meta['message'])
if frame.requestid in self._futures:
self._futures[frame.requestid].set_exception(e)
else:
raise e
if frame.requestid not in self._requests:
raise error.ProgrammingError(
'received frame for unknown request; this is either a bug in '
'the clientreactor not screening for this or this instance was '
'never told about this request: %r' % frame)
response = self._responses[frame.requestid]
if action == 'responsedata':
Gregory Szorc
wireprotov2peer: report exceptions in frame handling against request future...
r39521 # Any failures processing this frame should bubble up to the
# future tracking the request.
try:
self._processresponsedata(frame, meta, response)
except BaseException as e:
self._futures[frame.requestid].set_exception(e)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469 def _processresponsedata(self, frame, meta, response):
# This buffers all data until end of stream is received. This
# is bad for performance.
# TODO make response data streamable
response.b.write(meta['data'])
if meta['eos']:
# If the command has a decoder, resolve the future to the
# decoded value. Otherwise resolve to the rich response object.
decoder = COMMAND_DECODERS.get(response.command)
# TODO consider always resolving the overall status map.
if decoder:
objs = response.cborobjects()
overall = next(objs)
if overall['status'] == 'ok':
self._futures[frame.requestid].set_result(decoder(objs))
else:
Gregory Szorc
wireprotov2peer: properly format errors...
r39522 atoms = [{'msg': overall['error']['message']}]
if 'args' in overall['error']:
atoms[0]['args'] = overall['error']['args']
e = error.RepoError(formatrichmessage(atoms))
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469 self._futures[frame.requestid].set_exception(e)
else:
self._futures[frame.requestid].set_result(response)
del self._requests[frame.requestid]
del self._futures[frame.requestid]
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodebranchmap(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Response should be a single CBOR map of branch name to array of nodes.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 bm = next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
return {encoding.tolocal(k): v for k, v in bm.items()}
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodeheads(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Array of node bytestrings.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodeknown(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Bytestring where each byte is a 0 or 1.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 raw = next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
return [True if c == '1' else False for c in raw]
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodelistkeys(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Map with bytestring keys and values.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodelookup(objs):
return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodepushkey(objs):
return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
COMMAND_DECODERS = {
'branchmap': decodebranchmap,
'heads': decodeheads,
'known': decodeknown,
'listkeys': decodelistkeys,
'lookup': decodelookup,
'pushkey': decodepushkey,
}