# HG changeset patch # User Gregory Szorc # Date 2018-09-27 01:08:08 # Node ID 7e807b8a9e56760afb7ff90b8120347592d9cde6 # Parent b099e6032f387f2a0c6aa20e001e37bf0bbc41b8 wireprotov2: client support for following content redirects And with the server actually sending content redirects, it is finally time to implement client support for following them! When a redirect response is seen, we wait until all data for that request has been received (it should be nearly immediate since no data is expected to follow the redirect message). Then we use a URL opener to make a request. We stuff that response into the client handler and construct a new response object to track it. When readdata() is called for servicing requests, we attempt to read data from the first redirected response. During data reading, data is processed similarly to as if it came from a frame payload. The existing test for the functionality demonstrates the client transparently following the redirect and obtaining the command response data from an alternate URL! There is still plenty of work to do here, including shoring up testing. I'm not convinced things will work in the presence of multiple redirect responses. And we don't yet implement support for integrity verification or configuring server certificates to validate the connection. But it's a start. And it should enable us to start experimenting with "real" caches. Differential Revision: https://phab.mercurial-scm.org/D4778 diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py --- a/mercurial/httppeer.py +++ b/mercurial/httppeer.py @@ -513,7 +513,9 @@ def sendv2request(ui, opener, requestbui reactor = wireprotoframing.clientreactor(hasmultiplesend=False, buffersends=True) - handler = wireprotov2peer.clienthandler(ui, reactor) + handler = wireprotov2peer.clienthandler(ui, reactor, + opener=opener, + requestbuilder=requestbuilder) url = '%s/%s' % (apiurl, permission) diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -13,8 +13,12 @@ from .i18n import _ from . import ( encoding, error, + pycompat, sslutil, + url as urlmod, + util, wireprotoframing, + wireprototypes, ) from .utils import ( cborutil, @@ -112,9 +116,10 @@ class commandresponse(object): events occur. """ - def __init__(self, requestid, command): + def __init__(self, requestid, command, fromredirect=False): self.requestid = requestid self.command = command + self.fromredirect = fromredirect # Whether all remote input related to this command has been # received. @@ -132,6 +137,7 @@ class commandresponse(object): self._pendingevents = [] self._decoder = cborutil.bufferingdecoder() self._seeninitial = False + self._redirect = None def _oninputcomplete(self): with self._lock: @@ -146,10 +152,19 @@ class commandresponse(object): with self._lock: for o in self._decoder.getavailable(): - if not self._seeninitial: + if not self._seeninitial and not self.fromredirect: self._handleinitial(o) continue + # We should never see an object after a content redirect, + # as the spec says the main status object containing the + # content redirect is the only object in the stream. Fail + # if we see a misbehaving server. + if self._redirect: + raise error.Abort(_('received unexpected response data ' + 'after content redirect; the remote is ' + 'buggy')) + self._pendingevents.append(o) self._serviceable.set() @@ -160,7 +175,16 @@ class commandresponse(object): return elif o[b'status'] == b'redirect': - raise error.Abort(_('redirect responses not yet supported')) + l = o[b'location'] + self._redirect = wireprototypes.alternatelocationresponse( + url=l[b'url'], + mediatype=l[b'mediatype'], + size=l.get(b'size'), + fullhashes=l.get(b'fullhashes'), + fullhashseed=l.get(b'fullhashseed'), + serverdercerts=l.get(b'serverdercerts'), + servercadercerts=l.get(b'servercadercerts')) + return atoms = [{'msg': o[b'error'][b'message']}] if b'args' in o[b'error']: @@ -214,13 +238,17 @@ class clienthandler(object): with the higher-level peer API. """ - def __init__(self, ui, clientreactor): + def __init__(self, ui, clientreactor, opener=None, + requestbuilder=util.urlreq.request): self._ui = ui self._reactor = clientreactor self._requests = {} self._futures = {} self._responses = {} + self._redirects = [] self._frameseof = False + self._opener = opener or urlmod.opener(ui) + self._requestbuilder = requestbuilder def callcommand(self, command, args, f, redirect=None): """Register a request to call a command. @@ -269,7 +297,12 @@ class clienthandler(object): self._ui.note(_('received %r\n') % frame) self._processframe(frame) - if self._frameseof: + # Also try to read the first redirect. + if self._redirects: + if not self._processredirect(*self._redirects[0]): + self._redirects.pop(0) + + if self._frameseof and not self._redirects: return None return True @@ -318,10 +351,27 @@ class clienthandler(object): # This can raise. The caller can handle it. response._onresponsedata(meta['data']) + # If we got a content redirect response, we want to fetch it and + # expose the data as if we received it inline. But we also want to + # keep our internal request accounting in order. Our strategy is to + # basically put meaningful response handling on pause until EOS occurs + # and the stream accounting is in a good state. At that point, we follow + # the redirect and replace the response object with its data. + + redirect = response._redirect + handlefuture = False if redirect else True + if meta['eos']: response._oninputcomplete() del self._requests[frame.requestid] + if redirect: + self._followredirect(frame.requestid, redirect) + return + + if not handlefuture: + return + # If the command has a decoder, we wait until all input has been # received before resolving the future. Otherwise we resolve the # future immediately. @@ -336,6 +386,82 @@ class clienthandler(object): self._futures[frame.requestid].set_result(decoded) del self._futures[frame.requestid] + def _followredirect(self, requestid, redirect): + """Called to initiate redirect following for a request.""" + self._ui.note(_('(following redirect to %s)\n') % redirect.url) + + # TODO handle framed responses. + if redirect.mediatype != b'application/mercurial-cbor': + raise error.Abort(_('cannot handle redirects for the %s media type') + % redirect.mediatype) + + if redirect.fullhashes: + self._ui.warn(_('(support for validating hashes on content ' + 'redirects not supported)\n')) + + if redirect.serverdercerts or redirect.servercadercerts: + self._ui.warn(_('(support for pinning server certificates on ' + 'content redirects not supported)\n')) + + headers = { + r'Accept': redirect.mediatype, + } + + req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers) + + try: + res = self._opener.open(req) + except util.urlerr.httperror as e: + if e.code == 401: + raise error.Abort(_('authorization failed')) + raise + except util.httplib.HTTPException as e: + self._ui.debug('http error requesting %s\n' % req.get_full_url()) + self._ui.traceback() + raise IOError(None, e) + + urlmod.wrapresponse(res) + + # The existing response object is associated with frame data. Rather + # than try to normalize its state, just create a new object. + oldresponse = self._responses[requestid] + self._responses[requestid] = commandresponse(requestid, + oldresponse.command, + fromredirect=True) + + self._redirects.append((requestid, res)) + + def _processredirect(self, rid, res): + """Called to continue processing a response from a redirect.""" + response = self._responses[rid] + + try: + data = res.read(32768) + response._onresponsedata(data) + + # We're at end of stream. + if not data: + response._oninputcomplete() + + if rid not in self._futures: + return + + if response.command not in COMMAND_DECODERS: + self._futures[rid].set_result(response.objects()) + del self._futures[rid] + elif response._inputcomplete: + decoded = COMMAND_DECODERS[response.command](response.objects()) + self._futures[rid].set_result(decoded) + del self._futures[rid] + + return bool(data) + + except BaseException as e: + self._futures[rid].set_exception(e) + del self._futures[rid] + response._oninputcomplete() + return False + def decodebranchmap(objs): # Response should be a single CBOR map of branch name to array of nodes. bm = next(objs) diff --git a/tests/test-wireproto-content-redirects.t b/tests/test-wireproto-content-redirects.t --- a/tests/test-wireproto-content-redirects.t +++ b/tests/test-wireproto-content-redirects.t @@ -1354,8 +1354,33 @@ 2nd request should result in content red s> 0\r\n s> \r\n received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) - abort: redirect responses not yet supported - [255] + (following redirect to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0) (glob) + s> GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n + s> Accept-Encoding: identity\r\n + s> accept: application/mercurial-cbor\r\n + s> host: *:$HGPORT\r\n (glob) + s> user-agent: Mercurial debugwireproto\r\n + s> \r\n + s> makefile('rb', None) + s> HTTP/1.1 200 OK\r\n + s> Server: testing stub value\r\n + s> Date: $HTTP_DATE$\r\n + s> Content-Type: application/mercurial-cbor\r\n + s> Content-Length: 91\r\n + s> \r\n + s> \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 + response: gen[ + { + b'totalitems': 1 + }, + { + b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A', + b'parents': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + ] + } + ] $ cat error.log $ killdaemons.py