|
|
# wireprotov2peer.py - client side code for wire protocol version 2
|
|
|
#
|
|
|
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
|
|
|
#
|
|
|
# This software may be used and distributed according to the terms of the
|
|
|
# GNU General Public License version 2 or any later version.
|
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
import threading
|
|
|
|
|
|
from .i18n import _
|
|
|
from . import (
|
|
|
encoding,
|
|
|
error,
|
|
|
pycompat,
|
|
|
sslutil,
|
|
|
url as urlmod,
|
|
|
util,
|
|
|
wireprotoframing,
|
|
|
wireprototypes,
|
|
|
)
|
|
|
from .utils import (
|
|
|
cborutil,
|
|
|
)
|
|
|
|
|
|
def formatrichmessage(atoms):
|
|
|
"""Format an encoded message from the framing protocol."""
|
|
|
|
|
|
chunks = []
|
|
|
|
|
|
for atom in atoms:
|
|
|
msg = _(atom[b'msg'])
|
|
|
|
|
|
if b'args' in atom:
|
|
|
msg = msg % tuple(atom[b'args'])
|
|
|
|
|
|
chunks.append(msg)
|
|
|
|
|
|
return b''.join(chunks)
|
|
|
|
|
|
SUPPORTED_REDIRECT_PROTOCOLS = {
|
|
|
b'http',
|
|
|
b'https',
|
|
|
}
|
|
|
|
|
|
SUPPORTED_CONTENT_HASHES = {
|
|
|
b'sha1',
|
|
|
b'sha256',
|
|
|
}
|
|
|
|
|
|
def redirecttargetsupported(ui, target):
|
|
|
"""Determine whether a redirect target entry is supported.
|
|
|
|
|
|
``target`` should come from the capabilities data structure emitted by
|
|
|
the server.
|
|
|
"""
|
|
|
if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
|
|
|
ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
|
|
|
% (target[b'name'], target.get(b'protocol', b'')))
|
|
|
return False
|
|
|
|
|
|
if target.get(b'snirequired') and not sslutil.hassni:
|
|
|
ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
|
|
|
target[b'name'])
|
|
|
return False
|
|
|
|
|
|
if b'tlsversions' in target:
|
|
|
tlsversions = set(target[b'tlsversions'])
|
|
|
supported = set()
|
|
|
|
|
|
for v in sslutil.supportedprotocols:
|
|
|
assert v.startswith(b'tls')
|
|
|
supported.add(v[3:])
|
|
|
|
|
|
if not tlsversions & supported:
|
|
|
ui.note(_('(remote redirect target %s requires unsupported TLS '
|
|
|
'versions: %s)\n') % (
|
|
|
target[b'name'], b', '.join(sorted(tlsversions))))
|
|
|
return False
|
|
|
|
|
|
ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
|
|
|
|
|
|
return True
|
|
|
|
|
|
def supportedredirects(ui, apidescriptor):
|
|
|
"""Resolve the "redirect" command request key given an API descriptor.
|
|
|
|
|
|
Given an API descriptor returned by the server, returns a data structure
|
|
|
that can be used in hte "redirect" field of command requests to advertise
|
|
|
support for compatible redirect targets.
|
|
|
|
|
|
Returns None if no redirect targets are remotely advertised or if none are
|
|
|
supported.
|
|
|
"""
|
|
|
if not apidescriptor or b'redirect' not in apidescriptor:
|
|
|
return None
|
|
|
|
|
|
targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
|
|
|
if redirecttargetsupported(ui, t)]
|
|
|
|
|
|
hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
|
|
|
if h in SUPPORTED_CONTENT_HASHES]
|
|
|
|
|
|
return {
|
|
|
b'targets': targets,
|
|
|
b'hashes': hashes,
|
|
|
}
|
|
|
|
|
|
class commandresponse(object):
|
|
|
"""Represents the response to a command request.
|
|
|
|
|
|
Instances track the state of the command and hold its results.
|
|
|
|
|
|
An external entity is required to update the state of the object when
|
|
|
events occur.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, requestid, command, fromredirect=False):
|
|
|
self.requestid = requestid
|
|
|
self.command = command
|
|
|
self.fromredirect = fromredirect
|
|
|
|
|
|
# Whether all remote input related to this command has been
|
|
|
# received.
|
|
|
self._inputcomplete = False
|
|
|
|
|
|
# We have a lock that is acquired when important object state is
|
|
|
# mutated. This is to prevent race conditions between 1 thread
|
|
|
# sending us new data and another consuming it.
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
# An event is set when state of the object changes. This event
|
|
|
# is waited on by the generator emitting objects.
|
|
|
self._serviceable = threading.Event()
|
|
|
|
|
|
self._pendingevents = []
|
|
|
self._decoder = cborutil.bufferingdecoder()
|
|
|
self._seeninitial = False
|
|
|
self._redirect = None
|
|
|
|
|
|
def _oninputcomplete(self):
|
|
|
with self._lock:
|
|
|
self._inputcomplete = True
|
|
|
self._serviceable.set()
|
|
|
|
|
|
def _onresponsedata(self, data):
|
|
|
available, readcount, wanted = self._decoder.decode(data)
|
|
|
|
|
|
if not available:
|
|
|
return
|
|
|
|
|
|
with self._lock:
|
|
|
for o in self._decoder.getavailable():
|
|
|
if not self._seeninitial and not self.fromredirect:
|
|
|
self._handleinitial(o)
|
|
|
continue
|
|
|
|
|
|
# We should never see an object after a content redirect,
|
|
|
# as the spec says the main status object containing the
|
|
|
# content redirect is the only object in the stream. Fail
|
|
|
# if we see a misbehaving server.
|
|
|
if self._redirect:
|
|
|
raise error.Abort(_('received unexpected response data '
|
|
|
'after content redirect; the remote is '
|
|
|
'buggy'))
|
|
|
|
|
|
self._pendingevents.append(o)
|
|
|
|
|
|
self._serviceable.set()
|
|
|
|
|
|
def _handleinitial(self, o):
|
|
|
self._seeninitial = True
|
|
|
if o[b'status'] == b'ok':
|
|
|
return
|
|
|
|
|
|
elif o[b'status'] == b'redirect':
|
|
|
l = o[b'location']
|
|
|
self._redirect = wireprototypes.alternatelocationresponse(
|
|
|
url=l[b'url'],
|
|
|
mediatype=l[b'mediatype'],
|
|
|
size=l.get(b'size'),
|
|
|
fullhashes=l.get(b'fullhashes'),
|
|
|
fullhashseed=l.get(b'fullhashseed'),
|
|
|
serverdercerts=l.get(b'serverdercerts'),
|
|
|
servercadercerts=l.get(b'servercadercerts'))
|
|
|
return
|
|
|
|
|
|
atoms = [{'msg': o[b'error'][b'message']}]
|
|
|
if b'args' in o[b'error']:
|
|
|
atoms[0]['args'] = o[b'error'][b'args']
|
|
|
|
|
|
raise error.RepoError(formatrichmessage(atoms))
|
|
|
|
|
|
def objects(self):
|
|
|
"""Obtained decoded objects from this response.
|
|
|
|
|
|
This is a generator of data structures that were decoded from the
|
|
|
command response.
|
|
|
|
|
|
Obtaining the next member of the generator may block due to waiting
|
|
|
on external data to become available.
|
|
|
|
|
|
If the server encountered an error in the middle of serving the data
|
|
|
or if another error occurred, an exception may be raised when
|
|
|
advancing the generator.
|
|
|
"""
|
|
|
while True:
|
|
|
# TODO this can infinite loop if self._inputcomplete is never
|
|
|
# set. We likely want to tie the lifetime of this object/state
|
|
|
# to that of the background thread receiving frames and updating
|
|
|
# our state.
|
|
|
self._serviceable.wait(1.0)
|
|
|
|
|
|
with self._lock:
|
|
|
self._serviceable.clear()
|
|
|
|
|
|
# Make copies because objects could be mutated during
|
|
|
# iteration.
|
|
|
stop = self._inputcomplete
|
|
|
pending = list(self._pendingevents)
|
|
|
self._pendingevents[:] = []
|
|
|
|
|
|
for o in pending:
|
|
|
yield o
|
|
|
|
|
|
if stop:
|
|
|
break
|
|
|
|
|
|
class clienthandler(object):
|
|
|
"""Object to handle higher-level client activities.
|
|
|
|
|
|
The ``clientreactor`` is used to hold low-level state about the frame-based
|
|
|
protocol, such as which requests and streams are active. This type is used
|
|
|
for higher-level operations, such as reading frames from a socket, exposing
|
|
|
and managing a higher-level primitive for representing command responses,
|
|
|
etc. This class is what peers should probably use to bridge wire activity
|
|
|
with the higher-level peer API.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, ui, clientreactor, opener=None,
|
|
|
requestbuilder=util.urlreq.request):
|
|
|
self._ui = ui
|
|
|
self._reactor = clientreactor
|
|
|
self._requests = {}
|
|
|
self._futures = {}
|
|
|
self._responses = {}
|
|
|
self._redirects = []
|
|
|
self._frameseof = False
|
|
|
self._opener = opener or urlmod.opener(ui)
|
|
|
self._requestbuilder = requestbuilder
|
|
|
|
|
|
def callcommand(self, command, args, f, redirect=None):
|
|
|
"""Register a request to call a command.
|
|
|
|
|
|
Returns an iterable of frames that should be sent over the wire.
|
|
|
"""
|
|
|
request, action, meta = self._reactor.callcommand(command, args,
|
|
|
redirect=redirect)
|
|
|
|
|
|
if action != 'noop':
|
|
|
raise error.ProgrammingError('%s not yet supported' % action)
|
|
|
|
|
|
rid = request.requestid
|
|
|
self._requests[rid] = request
|
|
|
self._futures[rid] = f
|
|
|
# TODO we need some kind of lifetime on response instances otherwise
|
|
|
# objects() may deadlock.
|
|
|
self._responses[rid] = commandresponse(rid, command)
|
|
|
|
|
|
return iter(())
|
|
|
|
|
|
def flushcommands(self):
|
|
|
"""Flush all queued commands.
|
|
|
|
|
|
Returns an iterable of frames that should be sent over the wire.
|
|
|
"""
|
|
|
action, meta = self._reactor.flushcommands()
|
|
|
|
|
|
if action != 'sendframes':
|
|
|
raise error.ProgrammingError('%s not yet supported' % action)
|
|
|
|
|
|
return meta['framegen']
|
|
|
|
|
|
def readdata(self, framefh):
|
|
|
"""Attempt to read data and do work.
|
|
|
|
|
|
Returns None if no data was read. Presumably this means we're
|
|
|
done with all read I/O.
|
|
|
"""
|
|
|
if not self._frameseof:
|
|
|
frame = wireprotoframing.readframe(framefh)
|
|
|
if frame is None:
|
|
|
# TODO tell reactor?
|
|
|
self._frameseof = True
|
|
|
else:
|
|
|
self._ui.note(_('received %r\n') % frame)
|
|
|
self._processframe(frame)
|
|
|
|
|
|
# Also try to read the first redirect.
|
|
|
if self._redirects:
|
|
|
if not self._processredirect(*self._redirects[0]):
|
|
|
self._redirects.pop(0)
|
|
|
|
|
|
if self._frameseof and not self._redirects:
|
|
|
return None
|
|
|
|
|
|
return True
|
|
|
|
|
|
def _processframe(self, frame):
|
|
|
"""Process a single read frame."""
|
|
|
|
|
|
action, meta = self._reactor.onframerecv(frame)
|
|
|
|
|
|
if action == 'error':
|
|
|
e = error.RepoError(meta['message'])
|
|
|
|
|
|
if frame.requestid in self._responses:
|
|
|
self._responses[frame.requestid]._oninputcomplete()
|
|
|
|
|
|
if frame.requestid in self._futures:
|
|
|
self._futures[frame.requestid].set_exception(e)
|
|
|
del self._futures[frame.requestid]
|
|
|
else:
|
|
|
raise e
|
|
|
|
|
|
return
|
|
|
|
|
|
if frame.requestid not in self._requests:
|
|
|
raise error.ProgrammingError(
|
|
|
'received frame for unknown request; this is either a bug in '
|
|
|
'the clientreactor not screening for this or this instance was '
|
|
|
'never told about this request: %r' % frame)
|
|
|
|
|
|
response = self._responses[frame.requestid]
|
|
|
|
|
|
if action == 'responsedata':
|
|
|
# Any failures processing this frame should bubble up to the
|
|
|
# future tracking the request.
|
|
|
try:
|
|
|
self._processresponsedata(frame, meta, response)
|
|
|
except BaseException as e:
|
|
|
self._futures[frame.requestid].set_exception(e)
|
|
|
del self._futures[frame.requestid]
|
|
|
response._oninputcomplete()
|
|
|
else:
|
|
|
raise error.ProgrammingError(
|
|
|
'unhandled action from clientreactor: %s' % action)
|
|
|
|
|
|
def _processresponsedata(self, frame, meta, response):
|
|
|
# This can raise. The caller can handle it.
|
|
|
response._onresponsedata(meta['data'])
|
|
|
|
|
|
# If we got a content redirect response, we want to fetch it and
|
|
|
# expose the data as if we received it inline. But we also want to
|
|
|
# keep our internal request accounting in order. Our strategy is to
|
|
|
# basically put meaningful response handling on pause until EOS occurs
|
|
|
# and the stream accounting is in a good state. At that point, we follow
|
|
|
# the redirect and replace the response object with its data.
|
|
|
|
|
|
redirect = response._redirect
|
|
|
handlefuture = False if redirect else True
|
|
|
|
|
|
if meta['eos']:
|
|
|
response._oninputcomplete()
|
|
|
del self._requests[frame.requestid]
|
|
|
|
|
|
if redirect:
|
|
|
self._followredirect(frame.requestid, redirect)
|
|
|
return
|
|
|
|
|
|
if not handlefuture:
|
|
|
return
|
|
|
|
|
|
# If the command has a decoder, we wait until all input has been
|
|
|
# received before resolving the future. Otherwise we resolve the
|
|
|
# future immediately.
|
|
|
if frame.requestid not in self._futures:
|
|
|
return
|
|
|
|
|
|
if response.command not in COMMAND_DECODERS:
|
|
|
self._futures[frame.requestid].set_result(response.objects())
|
|
|
del self._futures[frame.requestid]
|
|
|
elif response._inputcomplete:
|
|
|
decoded = COMMAND_DECODERS[response.command](response.objects())
|
|
|
self._futures[frame.requestid].set_result(decoded)
|
|
|
del self._futures[frame.requestid]
|
|
|
|
|
|
def _followredirect(self, requestid, redirect):
|
|
|
"""Called to initiate redirect following for a request."""
|
|
|
self._ui.note(_('(following redirect to %s)\n') % redirect.url)
|
|
|
|
|
|
# TODO handle framed responses.
|
|
|
if redirect.mediatype != b'application/mercurial-cbor':
|
|
|
raise error.Abort(_('cannot handle redirects for the %s media type')
|
|
|
% redirect.mediatype)
|
|
|
|
|
|
if redirect.fullhashes:
|
|
|
self._ui.warn(_('(support for validating hashes on content '
|
|
|
'redirects not supported)\n'))
|
|
|
|
|
|
if redirect.serverdercerts or redirect.servercadercerts:
|
|
|
self._ui.warn(_('(support for pinning server certificates on '
|
|
|
'content redirects not supported)\n'))
|
|
|
|
|
|
headers = {
|
|
|
r'Accept': redirect.mediatype,
|
|
|
}
|
|
|
|
|
|
req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
|
|
|
|
|
|
try:
|
|
|
res = self._opener.open(req)
|
|
|
except util.urlerr.httperror as e:
|
|
|
if e.code == 401:
|
|
|
raise error.Abort(_('authorization failed'))
|
|
|
raise
|
|
|
except util.httplib.HTTPException as e:
|
|
|
self._ui.debug('http error requesting %s\n' % req.get_full_url())
|
|
|
self._ui.traceback()
|
|
|
raise IOError(None, e)
|
|
|
|
|
|
urlmod.wrapresponse(res)
|
|
|
|
|
|
# The existing response object is associated with frame data. Rather
|
|
|
# than try to normalize its state, just create a new object.
|
|
|
oldresponse = self._responses[requestid]
|
|
|
self._responses[requestid] = commandresponse(requestid,
|
|
|
oldresponse.command,
|
|
|
fromredirect=True)
|
|
|
|
|
|
self._redirects.append((requestid, res))
|
|
|
|
|
|
def _processredirect(self, rid, res):
|
|
|
"""Called to continue processing a response from a redirect."""
|
|
|
response = self._responses[rid]
|
|
|
|
|
|
try:
|
|
|
data = res.read(32768)
|
|
|
response._onresponsedata(data)
|
|
|
|
|
|
# We're at end of stream.
|
|
|
if not data:
|
|
|
response._oninputcomplete()
|
|
|
|
|
|
if rid not in self._futures:
|
|
|
return
|
|
|
|
|
|
if response.command not in COMMAND_DECODERS:
|
|
|
self._futures[rid].set_result(response.objects())
|
|
|
del self._futures[rid]
|
|
|
elif response._inputcomplete:
|
|
|
decoded = COMMAND_DECODERS[response.command](response.objects())
|
|
|
self._futures[rid].set_result(decoded)
|
|
|
del self._futures[rid]
|
|
|
|
|
|
return bool(data)
|
|
|
|
|
|
except BaseException as e:
|
|
|
self._futures[rid].set_exception(e)
|
|
|
del self._futures[rid]
|
|
|
response._oninputcomplete()
|
|
|
return False
|
|
|
|
|
|
def decodebranchmap(objs):
|
|
|
# Response should be a single CBOR map of branch name to array of nodes.
|
|
|
bm = next(objs)
|
|
|
|
|
|
return {encoding.tolocal(k): v for k, v in bm.items()}
|
|
|
|
|
|
def decodeheads(objs):
|
|
|
# Array of node bytestrings.
|
|
|
return next(objs)
|
|
|
|
|
|
def decodeknown(objs):
|
|
|
# Bytestring where each byte is a 0 or 1.
|
|
|
raw = next(objs)
|
|
|
|
|
|
return [True if c == '1' else False for c in raw]
|
|
|
|
|
|
def decodelistkeys(objs):
|
|
|
# Map with bytestring keys and values.
|
|
|
return next(objs)
|
|
|
|
|
|
def decodelookup(objs):
|
|
|
return next(objs)
|
|
|
|
|
|
def decodepushkey(objs):
|
|
|
return next(objs)
|
|
|
|
|
|
COMMAND_DECODERS = {
|
|
|
'branchmap': decodebranchmap,
|
|
|
'heads': decodeheads,
|
|
|
'known': decodeknown,
|
|
|
'listkeys': decodelistkeys,
|
|
|
'lookup': decodelookup,
|
|
|
'pushkey': decodepushkey,
|
|
|
}
|
|
|
|