wireprotov2peer.py
524 lines
| 17.4 KiB
| text/x-python
|
PythonLexer
/ mercurial / wireprotov2peer.py
Gregory Szorc
|
r37737 | # wireprotov2peer.py - client side code for wire protocol version 2 | ||
# | ||||
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
Gregory Szorc
|
r39597 | import threading | ||
Gregory Szorc
|
r37737 | from .i18n import _ | ||
from . import ( | ||||
Gregory Szorc
|
r37739 | encoding, | ||
Gregory Szorc
|
r37737 | error, | ||
Gregory Szorc
|
r40062 | pycompat, | ||
Gregory Szorc
|
r40060 | sslutil, | ||
Gregory Szorc
|
r40062 | url as urlmod, | ||
util, | ||||
Gregory Szorc
|
r37737 | wireprotoframing, | ||
Gregory Szorc
|
r40062 | wireprototypes, | ||
Gregory Szorc
|
r37737 | ) | ||
Gregory Szorc
|
r39481 | from .utils import ( | ||
cborutil, | ||||
) | ||||
Gregory Szorc
|
r37737 | |||
Gregory Szorc
|
r37743 | def formatrichmessage(atoms): | ||
"""Format an encoded message from the framing protocol.""" | ||||
chunks = [] | ||||
for atom in atoms: | ||||
msg = _(atom[b'msg']) | ||||
if b'args' in atom: | ||||
Gregory Szorc
|
r39522 | msg = msg % tuple(atom[b'args']) | ||
Gregory Szorc
|
r37743 | |||
chunks.append(msg) | ||||
return b''.join(chunks) | ||||
Gregory Szorc
|
r40060 | SUPPORTED_REDIRECT_PROTOCOLS = { | ||
b'http', | ||||
b'https', | ||||
} | ||||
SUPPORTED_CONTENT_HASHES = { | ||||
b'sha1', | ||||
b'sha256', | ||||
} | ||||
def redirecttargetsupported(ui, target): | ||||
"""Determine whether a redirect target entry is supported. | ||||
``target`` should come from the capabilities data structure emitted by | ||||
the server. | ||||
""" | ||||
if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS: | ||||
ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n') | ||||
% (target[b'name'], target.get(b'protocol', b''))) | ||||
return False | ||||
if target.get(b'snirequired') and not sslutil.hassni: | ||||
ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') % | ||||
target[b'name']) | ||||
return False | ||||
if b'tlsversions' in target: | ||||
tlsversions = set(target[b'tlsversions']) | ||||
supported = set() | ||||
for v in sslutil.supportedprotocols: | ||||
assert v.startswith(b'tls') | ||||
supported.add(v[3:]) | ||||
if not tlsversions & supported: | ||||
ui.note(_('(remote redirect target %s requires unsupported TLS ' | ||||
'versions: %s)\n') % ( | ||||
target[b'name'], b', '.join(sorted(tlsversions)))) | ||||
return False | ||||
ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name']) | ||||
return True | ||||
def supportedredirects(ui, apidescriptor): | ||||
"""Resolve the "redirect" command request key given an API descriptor. | ||||
Given an API descriptor returned by the server, returns a data structure | ||||
that can be used in hte "redirect" field of command requests to advertise | ||||
support for compatible redirect targets. | ||||
Returns None if no redirect targets are remotely advertised or if none are | ||||
supported. | ||||
""" | ||||
if not apidescriptor or b'redirect' not in apidescriptor: | ||||
return None | ||||
targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets'] | ||||
if redirecttargetsupported(ui, t)] | ||||
hashes = [h for h in apidescriptor[b'redirect'][b'hashes'] | ||||
if h in SUPPORTED_CONTENT_HASHES] | ||||
return { | ||||
b'targets': targets, | ||||
b'hashes': hashes, | ||||
} | ||||
Gregory Szorc
|
r37738 | class commandresponse(object): | ||
Gregory Szorc
|
r39597 | """Represents the response to a command request. | ||
Instances track the state of the command and hold its results. | ||||
An external entity is required to update the state of the object when | ||||
events occur. | ||||
""" | ||||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r40062 | def __init__(self, requestid, command, fromredirect=False): | ||
Gregory Szorc
|
r37738 | self.requestid = requestid | ||
self.command = command | ||||
Gregory Szorc
|
r40062 | self.fromredirect = fromredirect | ||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r39597 | # Whether all remote input related to this command has been | ||
# received. | ||||
self._inputcomplete = False | ||||
# We have a lock that is acquired when important object state is | ||||
# mutated. This is to prevent race conditions between 1 thread | ||||
# sending us new data and another consuming it. | ||||
self._lock = threading.RLock() | ||||
# An event is set when state of the object changes. This event | ||||
# is waited on by the generator emitting objects. | ||||
self._serviceable = threading.Event() | ||||
self._pendingevents = [] | ||||
Gregory Szorc
|
r40172 | self._pendingerror = None | ||
Gregory Szorc
|
r39597 | self._decoder = cborutil.bufferingdecoder() | ||
self._seeninitial = False | ||||
Gregory Szorc
|
r40062 | self._redirect = None | ||
Gregory Szorc
|
r39597 | |||
def _oninputcomplete(self): | ||||
with self._lock: | ||||
self._inputcomplete = True | ||||
self._serviceable.set() | ||||
def _onresponsedata(self, data): | ||||
available, readcount, wanted = self._decoder.decode(data) | ||||
if not available: | ||||
return | ||||
with self._lock: | ||||
for o in self._decoder.getavailable(): | ||||
Gregory Szorc
|
r40062 | if not self._seeninitial and not self.fromredirect: | ||
Gregory Szorc
|
r39597 | self._handleinitial(o) | ||
continue | ||||
Gregory Szorc
|
r40062 | # We should never see an object after a content redirect, | ||
# as the spec says the main status object containing the | ||||
# content redirect is the only object in the stream. Fail | ||||
# if we see a misbehaving server. | ||||
if self._redirect: | ||||
raise error.Abort(_('received unexpected response data ' | ||||
'after content redirect; the remote is ' | ||||
'buggy')) | ||||
Gregory Szorc
|
r39597 | self._pendingevents.append(o) | ||
self._serviceable.set() | ||||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r40172 | def _onerror(self, e): | ||
self._pendingerror = e | ||||
with self._lock: | ||||
self._serviceable.set() | ||||
Gregory Szorc
|
r39597 | def _handleinitial(self, o): | ||
self._seeninitial = True | ||||
Gregory Szorc
|
r40060 | if o[b'status'] == b'ok': | ||
Gregory Szorc
|
r39597 | return | ||
Gregory Szorc
|
r40060 | elif o[b'status'] == b'redirect': | ||
Gregory Szorc
|
r40062 | l = o[b'location'] | ||
self._redirect = wireprototypes.alternatelocationresponse( | ||||
url=l[b'url'], | ||||
mediatype=l[b'mediatype'], | ||||
size=l.get(b'size'), | ||||
fullhashes=l.get(b'fullhashes'), | ||||
fullhashseed=l.get(b'fullhashseed'), | ||||
serverdercerts=l.get(b'serverdercerts'), | ||||
servercadercerts=l.get(b'servercadercerts')) | ||||
return | ||||
Gregory Szorc
|
r40060 | |||
Gregory Szorc
|
r39597 | atoms = [{'msg': o[b'error'][b'message']}] | ||
if b'args' in o[b'error']: | ||||
atoms[0]['args'] = o[b'error'][b'args'] | ||||
raise error.RepoError(formatrichmessage(atoms)) | ||||
def objects(self): | ||||
"""Obtained decoded objects from this response. | ||||
This is a generator of data structures that were decoded from the | ||||
command response. | ||||
Obtaining the next member of the generator may block due to waiting | ||||
on external data to become available. | ||||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r39597 | If the server encountered an error in the middle of serving the data | ||
or if another error occurred, an exception may be raised when | ||||
advancing the generator. | ||||
""" | ||||
while True: | ||||
# TODO this can infinite loop if self._inputcomplete is never | ||||
# set. We likely want to tie the lifetime of this object/state | ||||
# to that of the background thread receiving frames and updating | ||||
# our state. | ||||
self._serviceable.wait(1.0) | ||||
Gregory Szorc
|
r40172 | if self._pendingerror: | ||
raise self._pendingerror | ||||
Gregory Szorc
|
r39597 | with self._lock: | ||
self._serviceable.clear() | ||||
# Make copies because objects could be mutated during | ||||
# iteration. | ||||
stop = self._inputcomplete | ||||
pending = list(self._pendingevents) | ||||
self._pendingevents[:] = [] | ||||
for o in pending: | ||||
yield o | ||||
if stop: | ||||
break | ||||
Gregory Szorc
|
r37738 | |||
Gregory Szorc
|
r37737 | class clienthandler(object): | ||
"""Object to handle higher-level client activities. | ||||
The ``clientreactor`` is used to hold low-level state about the frame-based | ||||
protocol, such as which requests and streams are active. This type is used | ||||
for higher-level operations, such as reading frames from a socket, exposing | ||||
and managing a higher-level primitive for representing command responses, | ||||
etc. This class is what peers should probably use to bridge wire activity | ||||
with the higher-level peer API. | ||||
""" | ||||
Gregory Szorc
|
r40062 | def __init__(self, ui, clientreactor, opener=None, | ||
requestbuilder=util.urlreq.request): | ||||
Gregory Szorc
|
r37737 | self._ui = ui | ||
self._reactor = clientreactor | ||||
self._requests = {} | ||||
self._futures = {} | ||||
self._responses = {} | ||||
Gregory Szorc
|
r40062 | self._redirects = [] | ||
Gregory Szorc
|
r40055 | self._frameseof = False | ||
Gregory Szorc
|
r40062 | self._opener = opener or urlmod.opener(ui) | ||
self._requestbuilder = requestbuilder | ||||
Gregory Szorc
|
r37737 | |||
Gregory Szorc
|
r40060 | def callcommand(self, command, args, f, redirect=None): | ||
Gregory Szorc
|
r37737 | """Register a request to call a command. | ||
Returns an iterable of frames that should be sent over the wire. | ||||
""" | ||||
Gregory Szorc
|
r40060 | request, action, meta = self._reactor.callcommand(command, args, | ||
redirect=redirect) | ||||
Gregory Szorc
|
r37737 | |||
if action != 'noop': | ||||
raise error.ProgrammingError('%s not yet supported' % action) | ||||
rid = request.requestid | ||||
self._requests[rid] = request | ||||
self._futures[rid] = f | ||||
Gregory Szorc
|
r39597 | # TODO we need some kind of lifetime on response instances otherwise | ||
# objects() may deadlock. | ||||
Gregory Szorc
|
r37738 | self._responses[rid] = commandresponse(rid, command) | ||
Gregory Szorc
|
r37737 | |||
return iter(()) | ||||
def flushcommands(self): | ||||
"""Flush all queued commands. | ||||
Returns an iterable of frames that should be sent over the wire. | ||||
""" | ||||
action, meta = self._reactor.flushcommands() | ||||
if action != 'sendframes': | ||||
raise error.ProgrammingError('%s not yet supported' % action) | ||||
return meta['framegen'] | ||||
Gregory Szorc
|
r40055 | def readdata(self, framefh): | ||
"""Attempt to read data and do work. | ||||
Gregory Szorc
|
r37737 | |||
Gregory Szorc
|
r40055 | Returns None if no data was read. Presumably this means we're | ||
done with all read I/O. | ||||
Gregory Szorc
|
r37737 | """ | ||
Gregory Szorc
|
r40055 | if not self._frameseof: | ||
frame = wireprotoframing.readframe(framefh) | ||||
if frame is None: | ||||
# TODO tell reactor? | ||||
self._frameseof = True | ||||
else: | ||||
self._ui.note(_('received %r\n') % frame) | ||||
self._processframe(frame) | ||||
Gregory Szorc
|
r37737 | |||
Gregory Szorc
|
r40062 | # Also try to read the first redirect. | ||
if self._redirects: | ||||
if not self._processredirect(*self._redirects[0]): | ||||
self._redirects.pop(0) | ||||
if self._frameseof and not self._redirects: | ||||
Gregory Szorc
|
r40055 | return None | ||
Gregory Szorc
|
r37737 | |||
return True | ||||
def _processframe(self, frame): | ||||
"""Process a single read frame.""" | ||||
action, meta = self._reactor.onframerecv(frame) | ||||
if action == 'error': | ||||
e = error.RepoError(meta['message']) | ||||
Gregory Szorc
|
r39597 | if frame.requestid in self._responses: | ||
self._responses[frame.requestid]._oninputcomplete() | ||||
Gregory Szorc
|
r37737 | if frame.requestid in self._futures: | ||
self._futures[frame.requestid].set_exception(e) | ||||
Gregory Szorc
|
r39597 | del self._futures[frame.requestid] | ||
Gregory Szorc
|
r37737 | else: | ||
raise e | ||||
Gregory Szorc
|
r39595 | return | ||
Gregory Szorc
|
r40169 | elif action == 'noop': | ||
return | ||||
Gregory Szorc
|
r40174 | elif action == 'responsedata': | ||
# Handled below. | ||||
pass | ||||
else: | ||||
raise error.ProgrammingError('action not handled: %s' % action) | ||||
Gregory Szorc
|
r39595 | |||
Gregory Szorc
|
r37737 | if frame.requestid not in self._requests: | ||
raise error.ProgrammingError( | ||||
'received frame for unknown request; this is either a bug in ' | ||||
'the clientreactor not screening for this or this instance was ' | ||||
'never told about this request: %r' % frame) | ||||
response = self._responses[frame.requestid] | ||||
if action == 'responsedata': | ||||
Gregory Szorc
|
r39521 | # Any failures processing this frame should bubble up to the | ||
# future tracking the request. | ||||
try: | ||||
self._processresponsedata(frame, meta, response) | ||||
except BaseException as e: | ||||
Gregory Szorc
|
r40172 | # If an exception occurs before the future is resolved, | ||
# fail the future. Otherwise, we stuff the exception on | ||||
# the response object so it can be raised during objects() | ||||
# iteration. If nothing is consuming objects(), we could | ||||
# silently swallow this exception. That's a risk we'll have to | ||||
# take. | ||||
if frame.requestid in self._futures: | ||||
self._futures[frame.requestid].set_exception(e) | ||||
del self._futures[frame.requestid] | ||||
response._oninputcomplete() | ||||
else: | ||||
response._onerror(e) | ||||
Gregory Szorc
|
r37737 | else: | ||
raise error.ProgrammingError( | ||||
'unhandled action from clientreactor: %s' % action) | ||||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r39469 | def _processresponsedata(self, frame, meta, response): | ||
Gregory Szorc
|
r39597 | # This can raise. The caller can handle it. | ||
response._onresponsedata(meta['data']) | ||||
Gregory Szorc
|
r39469 | |||
Gregory Szorc
|
r40062 | # If we got a content redirect response, we want to fetch it and | ||
# expose the data as if we received it inline. But we also want to | ||||
# keep our internal request accounting in order. Our strategy is to | ||||
# basically put meaningful response handling on pause until EOS occurs | ||||
# and the stream accounting is in a good state. At that point, we follow | ||||
# the redirect and replace the response object with its data. | ||||
redirect = response._redirect | ||||
handlefuture = False if redirect else True | ||||
Gregory Szorc
|
r39469 | if meta['eos']: | ||
Gregory Szorc
|
r39597 | response._oninputcomplete() | ||
del self._requests[frame.requestid] | ||||
Gregory Szorc
|
r39469 | |||
Gregory Szorc
|
r40062 | if redirect: | ||
self._followredirect(frame.requestid, redirect) | ||||
return | ||||
if not handlefuture: | ||||
return | ||||
Gregory Szorc
|
r39597 | # If the command has a decoder, we wait until all input has been | ||
# received before resolving the future. Otherwise we resolve the | ||||
# future immediately. | ||||
if frame.requestid not in self._futures: | ||||
return | ||||
Gregory Szorc
|
r39469 | |||
Gregory Szorc
|
r39597 | if response.command not in COMMAND_DECODERS: | ||
self._futures[frame.requestid].set_result(response.objects()) | ||||
del self._futures[frame.requestid] | ||||
elif response._inputcomplete: | ||||
decoded = COMMAND_DECODERS[response.command](response.objects()) | ||||
self._futures[frame.requestid].set_result(decoded) | ||||
Gregory Szorc
|
r39469 | del self._futures[frame.requestid] | ||
Gregory Szorc
|
r40062 | def _followredirect(self, requestid, redirect): | ||
"""Called to initiate redirect following for a request.""" | ||||
self._ui.note(_('(following redirect to %s)\n') % redirect.url) | ||||
# TODO handle framed responses. | ||||
if redirect.mediatype != b'application/mercurial-cbor': | ||||
raise error.Abort(_('cannot handle redirects for the %s media type') | ||||
% redirect.mediatype) | ||||
if redirect.fullhashes: | ||||
self._ui.warn(_('(support for validating hashes on content ' | ||||
'redirects not supported)\n')) | ||||
if redirect.serverdercerts or redirect.servercadercerts: | ||||
self._ui.warn(_('(support for pinning server certificates on ' | ||||
'content redirects not supported)\n')) | ||||
headers = { | ||||
r'Accept': redirect.mediatype, | ||||
} | ||||
req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers) | ||||
try: | ||||
res = self._opener.open(req) | ||||
except util.urlerr.httperror as e: | ||||
if e.code == 401: | ||||
raise error.Abort(_('authorization failed')) | ||||
raise | ||||
except util.httplib.HTTPException as e: | ||||
self._ui.debug('http error requesting %s\n' % req.get_full_url()) | ||||
self._ui.traceback() | ||||
raise IOError(None, e) | ||||
urlmod.wrapresponse(res) | ||||
# The existing response object is associated with frame data. Rather | ||||
# than try to normalize its state, just create a new object. | ||||
oldresponse = self._responses[requestid] | ||||
self._responses[requestid] = commandresponse(requestid, | ||||
oldresponse.command, | ||||
fromredirect=True) | ||||
self._redirects.append((requestid, res)) | ||||
def _processredirect(self, rid, res): | ||||
"""Called to continue processing a response from a redirect.""" | ||||
response = self._responses[rid] | ||||
try: | ||||
data = res.read(32768) | ||||
response._onresponsedata(data) | ||||
# We're at end of stream. | ||||
if not data: | ||||
response._oninputcomplete() | ||||
if rid not in self._futures: | ||||
return | ||||
if response.command not in COMMAND_DECODERS: | ||||
self._futures[rid].set_result(response.objects()) | ||||
del self._futures[rid] | ||||
elif response._inputcomplete: | ||||
decoded = COMMAND_DECODERS[response.command](response.objects()) | ||||
self._futures[rid].set_result(decoded) | ||||
del self._futures[rid] | ||||
return bool(data) | ||||
except BaseException as e: | ||||
self._futures[rid].set_exception(e) | ||||
del self._futures[rid] | ||||
response._oninputcomplete() | ||||
return False | ||||
Gregory Szorc
|
r37743 | def decodebranchmap(objs): | ||
Gregory Szorc
|
r37739 | # Response should be a single CBOR map of branch name to array of nodes. | ||
Gregory Szorc
|
r37743 | bm = next(objs) | ||
Gregory Szorc
|
r37739 | |||
return {encoding.tolocal(k): v for k, v in bm.items()} | ||||
Gregory Szorc
|
r37743 | def decodeheads(objs): | ||
Gregory Szorc
|
r37739 | # Array of node bytestrings. | ||
Gregory Szorc
|
r37743 | return next(objs) | ||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodeknown(objs): | ||
Gregory Szorc
|
r37739 | # Bytestring where each byte is a 0 or 1. | ||
Gregory Szorc
|
r37743 | raw = next(objs) | ||
Gregory Szorc
|
r37739 | |||
return [True if c == '1' else False for c in raw] | ||||
Gregory Szorc
|
r37743 | def decodelistkeys(objs): | ||
Gregory Szorc
|
r37739 | # Map with bytestring keys and values. | ||
Gregory Szorc
|
r37743 | return next(objs) | ||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodelookup(objs): | ||
return next(objs) | ||||
Gregory Szorc
|
r37739 | |||
Gregory Szorc
|
r37743 | def decodepushkey(objs): | ||
return next(objs) | ||||
Gregory Szorc
|
r37739 | |||
COMMAND_DECODERS = { | ||||
'branchmap': decodebranchmap, | ||||
'heads': decodeheads, | ||||
'known': decodeknown, | ||||
'listkeys': decodelistkeys, | ||||
'lookup': decodelookup, | ||||
'pushkey': decodepushkey, | ||||
} | ||||