##// END OF EJS Templates
wireprotov2: don't emit empty frames...
wireprotov2: don't emit empty frames Staring at logs revealed the presence of empty frames that should have contained payload. Let's stop that from happening. Differential Revision: https://phab.mercurial-scm.org/D4925

File last commit:

r40169:cfeba1aa default
r40171:3a6d6c54 default
Show More
wireprotov2peer.py
500 lines | 16.5 KiB | text/x-python | PythonLexer
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 # wireprotov2peer.py - client side code for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 import threading
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 from .i18n import _
from . import (
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 encoding,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 error,
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 pycompat,
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 sslutil,
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 url as urlmod,
util,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 wireprotoframing,
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 wireprototypes,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 )
Gregory Szorc
wireprotov2peer: use our CBOR decoder...
r39481 from .utils import (
cborutil,
)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def formatrichmessage(atoms):
"""Format an encoded message from the framing protocol."""
chunks = []
for atom in atoms:
msg = _(atom[b'msg'])
if b'args' in atom:
Gregory Szorc
wireprotov2peer: properly format errors...
r39522 msg = msg % tuple(atom[b'args'])
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743
chunks.append(msg)
return b''.join(chunks)
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 SUPPORTED_REDIRECT_PROTOCOLS = {
b'http',
b'https',
}
SUPPORTED_CONTENT_HASHES = {
b'sha1',
b'sha256',
}
def redirecttargetsupported(ui, target):
"""Determine whether a redirect target entry is supported.
``target`` should come from the capabilities data structure emitted by
the server.
"""
if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
% (target[b'name'], target.get(b'protocol', b'')))
return False
if target.get(b'snirequired') and not sslutil.hassni:
ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
target[b'name'])
return False
if b'tlsversions' in target:
tlsversions = set(target[b'tlsversions'])
supported = set()
for v in sslutil.supportedprotocols:
assert v.startswith(b'tls')
supported.add(v[3:])
if not tlsversions & supported:
ui.note(_('(remote redirect target %s requires unsupported TLS '
'versions: %s)\n') % (
target[b'name'], b', '.join(sorted(tlsversions))))
return False
ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
return True
def supportedredirects(ui, apidescriptor):
"""Resolve the "redirect" command request key given an API descriptor.
Given an API descriptor returned by the server, returns a data structure
that can be used in hte "redirect" field of command requests to advertise
support for compatible redirect targets.
Returns None if no redirect targets are remotely advertised or if none are
supported.
"""
if not apidescriptor or b'redirect' not in apidescriptor:
return None
targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
if redirecttargetsupported(ui, t)]
hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
if h in SUPPORTED_CONTENT_HASHES]
return {
b'targets': targets,
b'hashes': hashes,
}
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738 class commandresponse(object):
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 """Represents the response to a command request.
Instances track the state of the command and hold its results.
An external entity is required to update the state of the object when
events occur.
"""
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 def __init__(self, requestid, command, fromredirect=False):
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738 self.requestid = requestid
self.command = command
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 self.fromredirect = fromredirect
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 # Whether all remote input related to this command has been
# received.
self._inputcomplete = False
# We have a lock that is acquired when important object state is
# mutated. This is to prevent race conditions between 1 thread
# sending us new data and another consuming it.
self._lock = threading.RLock()
# An event is set when state of the object changes. This event
# is waited on by the generator emitting objects.
self._serviceable = threading.Event()
self._pendingevents = []
self._decoder = cborutil.bufferingdecoder()
self._seeninitial = False
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 self._redirect = None
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597
def _oninputcomplete(self):
with self._lock:
self._inputcomplete = True
self._serviceable.set()
def _onresponsedata(self, data):
available, readcount, wanted = self._decoder.decode(data)
if not available:
return
with self._lock:
for o in self._decoder.getavailable():
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 if not self._seeninitial and not self.fromredirect:
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 self._handleinitial(o)
continue
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 # We should never see an object after a content redirect,
# as the spec says the main status object containing the
# content redirect is the only object in the stream. Fail
# if we see a misbehaving server.
if self._redirect:
raise error.Abort(_('received unexpected response data '
'after content redirect; the remote is '
'buggy'))
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 self._pendingevents.append(o)
self._serviceable.set()
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 def _handleinitial(self, o):
self._seeninitial = True
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 if o[b'status'] == b'ok':
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 return
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 elif o[b'status'] == b'redirect':
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 l = o[b'location']
self._redirect = wireprototypes.alternatelocationresponse(
url=l[b'url'],
mediatype=l[b'mediatype'],
size=l.get(b'size'),
fullhashes=l.get(b'fullhashes'),
fullhashseed=l.get(b'fullhashseed'),
serverdercerts=l.get(b'serverdercerts'),
servercadercerts=l.get(b'servercadercerts'))
return
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 atoms = [{'msg': o[b'error'][b'message']}]
if b'args' in o[b'error']:
atoms[0]['args'] = o[b'error'][b'args']
raise error.RepoError(formatrichmessage(atoms))
def objects(self):
"""Obtained decoded objects from this response.
This is a generator of data structures that were decoded from the
command response.
Obtaining the next member of the generator may block due to waiting
on external data to become available.
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 If the server encountered an error in the middle of serving the data
or if another error occurred, an exception may be raised when
advancing the generator.
"""
while True:
# TODO this can infinite loop if self._inputcomplete is never
# set. We likely want to tie the lifetime of this object/state
# to that of the background thread receiving frames and updating
# our state.
self._serviceable.wait(1.0)
with self._lock:
self._serviceable.clear()
# Make copies because objects could be mutated during
# iteration.
stop = self._inputcomplete
pending = list(self._pendingevents)
self._pendingevents[:] = []
for o in pending:
yield o
if stop:
break
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 class clienthandler(object):
"""Object to handle higher-level client activities.
The ``clientreactor`` is used to hold low-level state about the frame-based
protocol, such as which requests and streams are active. This type is used
for higher-level operations, such as reading frames from a socket, exposing
and managing a higher-level primitive for representing command responses,
etc. This class is what peers should probably use to bridge wire activity
with the higher-level peer API.
"""
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 def __init__(self, ui, clientreactor, opener=None,
requestbuilder=util.urlreq.request):
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 self._ui = ui
self._reactor = clientreactor
self._requests = {}
self._futures = {}
self._responses = {}
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 self._redirects = []
Gregory Szorc
wireprotov2: change name and behavior of readframe()...
r40055 self._frameseof = False
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 self._opener = opener or urlmod.opener(ui)
self._requestbuilder = requestbuilder
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 def callcommand(self, command, args, f, redirect=None):
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 """Register a request to call a command.
Returns an iterable of frames that should be sent over the wire.
"""
Gregory Szorc
wireprotov2: client support for advertising redirect targets...
r40060 request, action, meta = self._reactor.callcommand(command, args,
redirect=redirect)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
if action != 'noop':
raise error.ProgrammingError('%s not yet supported' % action)
rid = request.requestid
self._requests[rid] = request
self._futures[rid] = f
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 # TODO we need some kind of lifetime on response instances otherwise
# objects() may deadlock.
Gregory Szorc
wireprotov2: establish a type for representing command response...
r37738 self._responses[rid] = commandresponse(rid, command)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
return iter(())
def flushcommands(self):
"""Flush all queued commands.
Returns an iterable of frames that should be sent over the wire.
"""
action, meta = self._reactor.flushcommands()
if action != 'sendframes':
raise error.ProgrammingError('%s not yet supported' % action)
return meta['framegen']
Gregory Szorc
wireprotov2: change name and behavior of readframe()...
r40055 def readdata(self, framefh):
"""Attempt to read data and do work.
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
Gregory Szorc
wireprotov2: change name and behavior of readframe()...
r40055 Returns None if no data was read. Presumably this means we're
done with all read I/O.
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 """
Gregory Szorc
wireprotov2: change name and behavior of readframe()...
r40055 if not self._frameseof:
frame = wireprotoframing.readframe(framefh)
if frame is None:
# TODO tell reactor?
self._frameseof = True
else:
self._ui.note(_('received %r\n') % frame)
self._processframe(frame)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 # Also try to read the first redirect.
if self._redirects:
if not self._processredirect(*self._redirects[0]):
self._redirects.pop(0)
if self._frameseof and not self._redirects:
Gregory Szorc
wireprotov2: change name and behavior of readframe()...
r40055 return None
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737
return True
def _processframe(self, frame):
"""Process a single read frame."""
action, meta = self._reactor.onframerecv(frame)
if action == 'error':
e = error.RepoError(meta['message'])
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 if frame.requestid in self._responses:
self._responses[frame.requestid]._oninputcomplete()
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 if frame.requestid in self._futures:
self._futures[frame.requestid].set_exception(e)
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 del self._futures[frame.requestid]
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 else:
raise e
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595 return
Gregory Szorc
wireprotov2: handle noop action...
r40169 elif action == 'noop':
return
Gregory Szorc
wireprotov2: implement commands as a generator of objects...
r39595
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 if frame.requestid not in self._requests:
raise error.ProgrammingError(
'received frame for unknown request; this is either a bug in '
'the clientreactor not screening for this or this instance was '
'never told about this request: %r' % frame)
response = self._responses[frame.requestid]
if action == 'responsedata':
Gregory Szorc
wireprotov2peer: report exceptions in frame handling against request future...
r39521 # Any failures processing this frame should bubble up to the
# future tracking the request.
try:
self._processresponsedata(frame, meta, response)
except BaseException as e:
self._futures[frame.requestid].set_exception(e)
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 del self._futures[frame.requestid]
response._oninputcomplete()
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469 def _processresponsedata(self, frame, meta, response):
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 # This can raise. The caller can handle it.
response._onresponsedata(meta['data'])
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 # If we got a content redirect response, we want to fetch it and
# expose the data as if we received it inline. But we also want to
# keep our internal request accounting in order. Our strategy is to
# basically put meaningful response handling on pause until EOS occurs
# and the stream accounting is in a good state. At that point, we follow
# the redirect and replace the response object with its data.
redirect = response._redirect
handlefuture = False if redirect else True
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469 if meta['eos']:
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 response._oninputcomplete()
del self._requests[frame.requestid]
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 if redirect:
self._followredirect(frame.requestid, redirect)
return
if not handlefuture:
return
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 # If the command has a decoder, we wait until all input has been
# received before resolving the future. Otherwise we resolve the
# future immediately.
if frame.requestid not in self._futures:
return
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469
Gregory Szorc
wireprotov2peer: stream decoded responses...
r39597 if response.command not in COMMAND_DECODERS:
self._futures[frame.requestid].set_result(response.objects())
del self._futures[frame.requestid]
elif response._inputcomplete:
decoded = COMMAND_DECODERS[response.command](response.objects())
self._futures[frame.requestid].set_result(decoded)
Gregory Szorc
wireprotov2peer: split responsedata handling into separate function...
r39469 del self._futures[frame.requestid]
Gregory Szorc
wireprotov2: client support for following content redirects...
r40062 def _followredirect(self, requestid, redirect):
"""Called to initiate redirect following for a request."""
self._ui.note(_('(following redirect to %s)\n') % redirect.url)
# TODO handle framed responses.
if redirect.mediatype != b'application/mercurial-cbor':
raise error.Abort(_('cannot handle redirects for the %s media type')
% redirect.mediatype)
if redirect.fullhashes:
self._ui.warn(_('(support for validating hashes on content '
'redirects not supported)\n'))
if redirect.serverdercerts or redirect.servercadercerts:
self._ui.warn(_('(support for pinning server certificates on '
'content redirects not supported)\n'))
headers = {
r'Accept': redirect.mediatype,
}
req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
try:
res = self._opener.open(req)
except util.urlerr.httperror as e:
if e.code == 401:
raise error.Abort(_('authorization failed'))
raise
except util.httplib.HTTPException as e:
self._ui.debug('http error requesting %s\n' % req.get_full_url())
self._ui.traceback()
raise IOError(None, e)
urlmod.wrapresponse(res)
# The existing response object is associated with frame data. Rather
# than try to normalize its state, just create a new object.
oldresponse = self._responses[requestid]
self._responses[requestid] = commandresponse(requestid,
oldresponse.command,
fromredirect=True)
self._redirects.append((requestid, res))
def _processredirect(self, rid, res):
"""Called to continue processing a response from a redirect."""
response = self._responses[rid]
try:
data = res.read(32768)
response._onresponsedata(data)
# We're at end of stream.
if not data:
response._oninputcomplete()
if rid not in self._futures:
return
if response.command not in COMMAND_DECODERS:
self._futures[rid].set_result(response.objects())
del self._futures[rid]
elif response._inputcomplete:
decoded = COMMAND_DECODERS[response.command](response.objects())
self._futures[rid].set_result(decoded)
del self._futures[rid]
return bool(data)
except BaseException as e:
self._futures[rid].set_exception(e)
del self._futures[rid]
response._oninputcomplete()
return False
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodebranchmap(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Response should be a single CBOR map of branch name to array of nodes.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 bm = next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
return {encoding.tolocal(k): v for k, v in bm.items()}
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodeheads(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Array of node bytestrings.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodeknown(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Bytestring where each byte is a 0 or 1.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 raw = next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
return [True if c == '1' else False for c in raw]
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodelistkeys(objs):
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739 # Map with bytestring keys and values.
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodelookup(objs):
return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
Gregory Szorc
wireprotov2: change command response protocol to include a leading map...
r37743 def decodepushkey(objs):
return next(objs)
Gregory Szorc
wireprotov2: decode responses to their expected types...
r37739
COMMAND_DECODERS = {
'branchmap': decodebranchmap,
'heads': decodeheads,
'known': decodeknown,
'listkeys': decodelistkeys,
'lookup': decodelookup,
'pushkey': decodepushkey,
}