##// END OF EJS Templates
wireprotov2: implement commands as a generator of objects...
wireprotov2: implement commands as a generator of objects Previously, wire protocol version 2 inherited version 1's model of having separate types to represent the results of different wire protocol commands. As I implemented more powerful commands in future commits, I found I was using a common pattern of returning a special type to hold a generator. This meant the command function required a closure to do most of the work. That made logic flow more difficult to follow. I also noticed that many commands were effectively a sequence of objects to be CBOR encoded. I think it makes sense to define version 2 commands as generators. This way, commands can simply emit the data structures they wish to send to the client. This eliminates the need for a closure in command functions and removes encoding from the bodies of commands. As part of this commit, the handling of response objects has been moved into the serverreactor class. This puts the reactor in the driver's seat with regards to CBOR encoding and error handling. Having error handling in the function that emits frames is particularly important because exceptions in that function can lead to things getting in a bad state: I'm fairly certain that uncaught exceptions in the frame generator were causing deadlocks. I also introduced a dedicated error type for explicit error reporting in command handlers. This will be used in subsequent commits. There's still a bit of work to be done here, especially around formalizing the error handling "protocol." I've added yet another TODO to track this so we don't forget. Test output changed because we're using generators and no longer know we are at the end of the data until we hit the end of the generator. This means we can't emit the end-of-stream flag until we've exhausted the generator. Hence the introduction of 0-sized end-of-stream frames. Differential Revision: https://phab.mercurial-scm.org/D4472

File last commit:

r39585:089fc0db default
r39595:07b58266 default
Show More
httppeer.py
1002 lines | 33.7 KiB | text/x-python | PythonLexer
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 # httppeer.py - HTTP repository proxy classes for mercurial
#
# Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
# Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
Gregory Szorc
httppeer: use absolute_import
r25954 from __future__ import absolute_import
import errno
Augie Fackler
httppeer: add support for httppostargs when we're sending a file...
r33820 import io
Gregory Szorc
httppeer: use absolute_import
r25954 import os
import socket
Gregory Szorc
httppeer: advertise and support application/mercurial-0.2...
r30763 import struct
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 import weakref
Gregory Szorc
httppeer: use absolute_import
r25954
from .i18n import _
from . import (
Martin von Zweigbergk
bundle: move writebundle() from changegroup.py to bundle2.py (API)...
r28666 bundle2,
Gregory Szorc
httppeer: use absolute_import
r25954 error,
httpconnection,
Pulkit Goyal
py3: convert the mode argument of os.fdopen to unicodes (1 of 2)...
r30924 pycompat,
Gregory Szorc
httppeer: implement ipeerconnection...
r37627 repository,
Gregory Szorc
httppeer: use absolute_import
r25954 statichttprepo,
Gregory Szorc
httppeer: alias url as urlmod...
r36977 url as urlmod,
Gregory Szorc
httppeer: use absolute_import
r25954 util,
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 wireprotoframing,
Gregory Szorc
httppeer: support protocol upgrade...
r37576 wireprototypes,
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 wireprotov1peer,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 wireprotov2peer,
Gregory Szorc
wireproto: extract HTTP version 2 code to own module...
r37563 wireprotov2server,
Gregory Szorc
httppeer: use absolute_import
r25954 )
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 from .utils import (
Gregory Szorc
httppeer: use our CBOR decoder...
r39476 cborutil,
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 interfaceutil,
Gregory Szorc
httppeer: log commands for version 2 peer...
r39467 stringutil,
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 )
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Pulkit Goyal
py3: conditionalize httplib import...
r29455 httplib = util.httplib
timeless
pycompat: switch to util.urlreq/util.urlerr for py3 compat
r28883 urlerr = util.urlerr
urlreq = util.urlreq
Gregory Szorc
httppeer: extract code for HTTP header spanning...
r30759 def encodevalueinheaders(value, header, limit):
"""Encode a string value into multiple HTTP headers.
``value`` will be encoded into 1 or more HTTP headers with the names
``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
name + value will be at most ``limit`` bytes long.
Augie Fackler
httppeer: always produce native str header keys and values...
r34733 Returns an iterable of 2-tuples consisting of header names and
values as native strings.
Gregory Szorc
httppeer: extract code for HTTP header spanning...
r30759 """
Augie Fackler
httppeer: always produce native str header keys and values...
r34733 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
# not bytes. This function always takes bytes in as arguments.
fmt = pycompat.strurl(header) + r'-%s'
# Note: it is *NOT* a bug that the last bit here is a bytestring
# and not a unicode: we're just getting the encoded length anyway,
# and using an r-string to make it portable between Python 2 and 3
# doesn't work because then the \r is a literal backslash-r
# instead of a carriage return.
valuelen = limit - len(fmt % r'000') - len(': \r\n')
Gregory Szorc
httppeer: extract code for HTTP header spanning...
r30759 result = []
n = 0
Gregory Szorc
global: use pycompat.xrange()...
r38806 for i in pycompat.xrange(0, len(value), valuelen):
Gregory Szorc
httppeer: extract code for HTTP header spanning...
r30759 n += 1
Augie Fackler
httppeer: always produce native str header keys and values...
r34733 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
Gregory Szorc
httppeer: extract code for HTTP header spanning...
r30759
return result
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 def _wraphttpresponse(resp):
"""Wrap an HTTPResponse with common error handlers.
This ensures that any I/O from any consumer raises the appropriate
error and messaging.
"""
origread = resp.read
class readerproxy(resp.__class__):
def read(self, size=None):
try:
return origread(size)
except httplib.IncompleteRead as e:
# e.expected is an integer if length known or None otherwise.
if e.expected:
av6
httppeer: calculate total expected bytes correctly...
r39519 got = len(e.partial)
total = e.expected + got
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 msg = _('HTTP request error (incomplete response; '
av6
httppeer: calculate total expected bytes correctly...
r39519 'expected %d bytes got %d)') % (total, got)
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 else:
msg = _('HTTP request error (incomplete response)')
Gregory Szorc
error: rename RichIOError to PeerTransportError...
r32023 raise error.PeerTransportError(
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 msg,
hint=_('this may be an intermittent network failure; '
'if the error persists, consider contacting the '
'network or server operator'))
except httplib.HTTPException as e:
Gregory Szorc
error: rename RichIOError to PeerTransportError...
r32023 raise error.PeerTransportError(
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 _('HTTP request error (%s)') % e,
FUJIWARA Katsunori
httppeer: unify hint message for PeerTransportError...
r32087 hint=_('this may be an intermittent network failure; '
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002 'if the error persists, consider contacting the '
'network or server operator'))
resp.__class__ = readerproxy
Augie Fackler
httppeer: add support for httppostargs when we're sending a file...
r33820 class _multifile(object):
def __init__(self, *fileobjs):
for f in fileobjs:
if not util.safehasattr(f, 'length'):
raise ValueError(
'_multifile only supports file objects that '
'have a length but this one does not:', type(f), f)
self._fileobjs = fileobjs
self._index = 0
@property
def length(self):
return sum(f.length for f in self._fileobjs)
def read(self, amt=None):
if amt <= 0:
return ''.join(f.read() for f in self._fileobjs)
parts = []
while amt and self._index < len(self._fileobjs):
parts.append(self._fileobjs[self._index].read(amt))
got = len(parts[-1])
if got < amt:
self._index += 1
amt -= got
return ''.join(parts)
def seek(self, offset, whence=os.SEEK_SET):
if whence != os.SEEK_SET:
raise NotImplementedError(
'_multifile does not support anything other'
' than os.SEEK_SET for whence on seek()')
if offset != 0:
raise NotImplementedError(
'_multifile only supports seeking to start, but that '
'could be fixed if you need it')
for f in self._fileobjs:
f.seek(0)
self._index = 0
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
repobaseurl, cmd, args):
"""Make an HTTP request to run a command for a version 1 client.
``caps`` is a set of known server capabilities. The value may be
None if capabilities are not yet known.
``capablefn`` is a function to evaluate a capability.
``cmd``, ``args``, and ``data`` define the command, its arguments, and
raw data to pass to it.
"""
if cmd == 'pushkey':
args['data'] = ''
data = args.pop('data', None)
headers = args.pop('headers', {})
ui.debug("sending %s command\n" % cmd)
q = [('cmd', cmd)]
headersize = 0
# Important: don't use self.capable() here or else you end up
# with infinite recursion when trying to look up capabilities
# for the first time.
postargsok = caps is not None and 'httppostargs' in caps
# Send arguments via POST.
if postargsok and args:
strargs = urlreq.urlencode(sorted(args.items()))
if not data:
data = strargs
else:
if isinstance(data, bytes):
i = io.BytesIO(data)
i.length = len(data)
data = i
argsio = io.BytesIO(strargs)
argsio.length = len(strargs)
data = _multifile(argsio, data)
headers[r'X-HgArgs-Post'] = len(strargs)
elif args:
# Calling self.capable() can infinite loop if we are calling
# "capabilities". But that command should never accept wire
# protocol arguments. So this should never happen.
assert cmd != 'capabilities'
httpheader = capablefn('httpheader')
if httpheader:
headersize = int(httpheader.split(',', 1)[0])
# Send arguments via HTTP headers.
if headersize > 0:
# The headers can typically carry more data than the URL.
encargs = urlreq.urlencode(sorted(args.items()))
for header, value in encodevalueinheaders(encargs, 'X-HgArg',
headersize):
headers[header] = value
# Send arguments via query string (Mercurial <1.9).
else:
q += sorted(args.items())
qs = '?%s' % urlreq.urlencode(q)
cu = "%s%s" % (repobaseurl, qs)
size = 0
if util.safehasattr(data, 'length'):
size = data.length
elif data is not None:
size = len(data)
if data is not None and r'Content-Type' not in headers:
headers[r'Content-Type'] = r'application/mercurial-0.1'
# Tell the server we accept application/mercurial-0.2 and multiple
# compression formats if the server is capable of emitting those
# payloads.
Gregory Szorc
httppeer: only advertise partial-pull if capabilities are known...
r37574 # Note: Keep this set empty by default, as client advertisement of
# protocol parameters should only occur after the handshake.
protoparams = set()
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567
mediatypes = set()
if caps is not None:
mt = capablefn('httpmediatype')
if mt:
protoparams.add('0.1')
mediatypes = set(mt.split(','))
Gregory Szorc
httppeer: only advertise partial-pull if capabilities are known...
r37574 protoparams.add('partial-pull')
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567 if '0.2tx' in mediatypes:
protoparams.add('0.2')
if '0.2tx' in mediatypes and capablefn('compression'):
# We /could/ compare supported compression formats and prune
# non-mutually supported or error if nothing is mutually supported.
# For now, send the full list to the server and have it error.
comps = [e.wireprotosupport().name for e in
util.compengines.supportedwireengines(util.CLIENTROLE)]
protoparams.add('comp=%s' % ','.join(comps))
if protoparams:
protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
'X-HgProto',
headersize or 1024)
for header, value in protoheaders:
headers[header] = value
Gregory Szorc
httppeer: always add x-hg* headers to Vary header...
r37573
varyheaders = []
for header in headers:
if header.lower().startswith(r'x-hg'):
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567 varyheaders.append(header)
if varyheaders:
Gregory Szorc
httppeer: always add x-hg* headers to Vary header...
r37573 headers[r'Vary'] = r','.join(sorted(varyheaders))
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567
req = requestbuilder(pycompat.strurl(cu), data, headers)
if data is not None:
ui.debug("sending %d bytes\n" % size)
req.add_unredirected_header(r'Content-Length', r'%d' % size)
return req, cu, qs
Augie Fackler
httppeer: work around API differences on urllib Request objects...
r37756 def _reqdata(req):
"""Get request data, if any. If no data, returns None."""
if pycompat.ispy3:
return req.data
if not req.has_data():
return None
return req.get_data()
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566 def sendrequest(ui, opener, req):
"""Send a prepared HTTP request.
Returns the response object.
"""
Boris Feld
httppeer: declare 'dbg' at the function level...
r38139 dbg = ui.debug
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566 if (ui.debugflag
and ui.configbool('devel', 'debug.peer-request')):
line = 'devel-peer-request: %s\n'
Augie Fackler
httppeer: fix debug prints to work on Python 3...
r37754 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
pycompat.bytesurl(req.get_full_url())))
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566 hgargssize = None
for header, value in sorted(req.header_items()):
Augie Fackler
httppeer: no matter what Python 3 might think, http headers are bytes...
r37755 header = pycompat.bytesurl(header)
value = pycompat.bytesurl(value)
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566 if header.startswith('X-hgarg-'):
if hgargssize is None:
hgargssize = 0
hgargssize += len(value)
else:
dbg(line % ' %s %s' % (header, value))
if hgargssize is not None:
dbg(line % ' %d bytes of commands arguments in headers'
% hgargssize)
Augie Fackler
httppeer: work around API differences on urllib Request objects...
r37756 data = _reqdata(req)
if data is not None:
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566 length = getattr(data, 'length', None)
if length is None:
length = len(data)
dbg(line % ' %d bytes of data' % length)
start = util.timer()
Martin von Zweigbergk
httppeer: fix use of uninitialized variable with devel logging...
r38521 res = None
Gregory Szorc
httppeer: move error handling and response wrapping into sendrequest...
r37568 try:
res = opener.open(req)
except urlerr.httperror as inst:
if inst.code == 401:
raise error.Abort(_('authorization failed'))
raise
except httplib.HTTPException as inst:
ui.debug('http error requesting %s\n' %
util.hidepassword(req.get_full_url()))
ui.traceback()
raise IOError(None, inst)
finally:
Boris Feld
httppeer: properly gate debug usage behind debug flag check...
r38138 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
Martin von Zweigbergk
httppeer: fix use of uninitialized variable with devel logging...
r38521 code = res.code if res else -1
Augie Fackler
httppeer: fix debug prints to work on Python 3...
r37754 dbg(line % ' finished in %.4f seconds (%d)'
Martin von Zweigbergk
httppeer: fix use of uninitialized variable with devel logging...
r38521 % (util.timer() - start, code))
Gregory Szorc
httppeer: move error handling and response wrapping into sendrequest...
r37568
# Insert error handlers for common I/O failures.
_wraphttpresponse(res)
Gregory Szorc
httppeer: extract code for performing an HTTP request...
r37566
return res
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 class RedirectedRepoError(error.RepoError):
def __init__(self, msg, respurl):
super(RedirectedRepoError, self).__init__(msg)
self.respurl = respurl
Gregory Szorc
httppeer: support protocol upgrade...
r37576 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
allowcbor=False):
Gregory Szorc
httppeer: extract common response handling into own function...
r37569 # record the url we got redirected to
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 redirected = False
Gregory Szorc
httppeer: extract common response handling into own function...
r37569 respurl = pycompat.bytesurl(resp.geturl())
if respurl.endswith(qs):
respurl = respurl[:-len(qs)]
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 qsdropped = False
else:
qsdropped = True
Gregory Szorc
httppeer: extract common response handling into own function...
r37569 if baseurl.rstrip('/') != respurl.rstrip('/'):
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 redirected = True
Gregory Szorc
httppeer: extract common response handling into own function...
r37569 if not ui.quiet:
ui.warn(_('real URL is %s\n') % respurl)
try:
proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
except AttributeError:
proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
safeurl = util.hidepassword(baseurl)
if proto.startswith('application/hg-error'):
raise error.OutOfBandError(resp.read())
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572
# Pre 1.0 versions of Mercurial used text/plain and
# application/hg-changegroup. We don't support such old servers.
if not proto.startswith('application/mercurial-'):
Gregory Szorc
httppeer: extract common response handling into own function...
r37569 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 msg = _("'%s' does not appear to be an hg repository:\n"
"---%%<--- (%s)\n%s\n---%%<---\n") % (
safeurl, proto or 'no content-type', resp.read(1024))
# Some servers may strip the query string from the redirect. We
# raise a special error type so callers can react to this specially.
if redirected and qsdropped:
raise RedirectedRepoError(msg, respurl)
else:
raise error.RepoError(msg)
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 try:
Gregory Szorc
httppeer: support protocol upgrade...
r37576 subtype = proto.split('-', 1)[1]
# Unless we end up supporting CBOR in the legacy wire protocol,
# this should ONLY be encountered for the initial capabilities
# request during handshake.
if subtype == 'cbor':
if allowcbor:
return respurl, proto, resp
else:
raise error.RepoError(_('unexpected CBOR response from '
'server'))
version_info = tuple([int(n) for n in subtype.split('.')])
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 except ValueError:
raise error.RepoError(_("'%s' sent a broken Content-Type "
"header (%s)") % (safeurl, proto))
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 # TODO consider switching to a decompression reader that uses
# generators.
if version_info == (0, 1):
if compressible:
resp = util.compengines['zlib'].decompressorreader(resp)
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 elif version_info == (0, 2):
# application/mercurial-0.2 always identifies the compression
# engine in the payload header.
av6
httppeer: use util.readexactly() to abort on incomplete responses...
r39520 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
ename = util.readexactly(resp, elen)
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 engine = util.compengines.forwiretype(ename)
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
httppeer: don't accept very old media types (BC)...
r37572 resp = engine.decompressorreader(resp)
else:
raise error.RepoError(_("'%s' uses newer protocol %s") %
Gregory Szorc
httppeer: support protocol upgrade...
r37576 (safeurl, subtype))
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
httppeer: support protocol upgrade...
r37576 return respurl, proto, resp
Gregory Szorc
httppeer: extract common response handling into own function...
r37569
Gregory Szorc
wireproto: move version 1 peer functionality to standalone module (API)...
r37632 class httppeer(wireprotov1peer.wirepeer):
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 def __init__(self, ui, path, url, opener, requestbuilder, caps):
Gregory Szorc
peer: make ui an attribute...
r37337 self.ui = ui
Gregory Szorc
httppeer: make several instance attributes internal (API)...
r33671 self._path = path
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024 self._url = url
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 self._caps = caps
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024 self._urlopener = opener
Gregory Szorc
httppeer: move requestbuilder defaults into makepeer() argument...
r37565 self._requestbuilder = requestbuilder
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
def __del__(self):
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024 for h in self._urlopener.handlers:
h.close()
getattr(h, "close_all", lambda: None)()
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Gregory Szorc
repository: port peer interfaces to zope.interface...
r37336 # Begin of ipeerconnection interface.
Gregory Szorc
httppeer: use peer interface...
r33804
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 def url(self):
Gregory Szorc
httppeer: make several instance attributes internal (API)...
r33671 return self._path
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Gregory Szorc
httppeer: use peer interface...
r33804 def local(self):
return None
def peer(self):
return self
def canpush(self):
return True
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Gregory Szorc
httppeer: use peer interface...
r33804 def close(self):
pass
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Gregory Szorc
repository: port peer interfaces to zope.interface...
r37336 # End of ipeerconnection interface.
Gregory Szorc
httppeer: use peer interface...
r33804
Gregory Szorc
repository: port peer interfaces to zope.interface...
r37336 # Begin of ipeercommands interface.
Gregory Szorc
httppeer: use peer interface...
r33804
def capabilities(self):
Gregory Szorc
httppeer: make several instance attributes internal (API)...
r33671 return self._caps
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Gregory Szorc
repository: port peer interfaces to zope.interface...
r37336 # End of ipeercommands interface.
Gregory Szorc
httppeer: use peer interface...
r33804
Gregory Szorc
httppeer: do decompression inside _callstream...
r30464 def _callstream(self, cmd, _compressible=False, **args):
Pulkit Goyal
py3: handle keyword arguments correctly in httppeer.py...
r35360 args = pycompat.byteskwargs(args)
Gregory Szorc
httppeer: change logic around argument handling...
r36236
Gregory Szorc
httppeer: extract code for creating a request into own function...
r37567 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
self._caps, self.capable,
self._url, cmd, args)
Gregory Szorc
httppeer: advertise and support application/mercurial-0.2...
r30763
Gregory Szorc
httppeer: move error handling and response wrapping into sendrequest...
r37568 resp = sendrequest(self.ui, self._urlopener, req)
Gregory Szorc
httppeer: wrap HTTPResponse.read() globally...
r32002
Gregory Szorc
httppeer: support protocol upgrade...
r37576 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
resp, _compressible)
Gregory Szorc
httppeer: do decompression inside _callstream...
r30464
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 return resp
def _call(self, cmd, **args):
fp = self._callstream(cmd, **args)
try:
return fp.read()
finally:
# if using keepalive, allow connection to be reused
fp.close()
def _callpush(self, cmd, cg, **args):
# have to stream bundle to a temp file because we do not have
# http 1.1 chunked transfer.
types = self.capable('unbundle')
try:
types = types.split(',')
except AttributeError:
# servers older than d1b16a746db6 will send 'unbundle' as a
# boolean capability. They only support headerless/uncompressed
# bundles.
types = [""]
for x in types:
Martin von Zweigbergk
bundle: move writebundle() from changegroup.py to bundle2.py (API)...
r28666 if x in bundle2.bundletypes:
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 type = x
break
Martin von Zweigbergk
bundle: move writebundle() from changegroup.py to bundle2.py (API)...
r28666 tempname = bundle2.writebundle(self.ui, cg, None, type)
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
Augie Fackler
httppeer: headers are native strings...
r36311 headers = {r'Content-Type': r'application/mercurial-0.1'}
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
try:
Matt Mackall
httppeer: use try/except/finally
r25085 r = self._call(cmd, data=fp, headers=headers, **args)
vals = r.split('\n', 1)
if len(vals) < 2:
raise error.ResponseError(_("unexpected response:"), r)
return vals
Augie Fackler
httppeer: explicitly catch urlerr.httperror and re-raise...
r36448 except urlerr.httperror:
# Catch and re-raise these so we don't try and treat them
# like generic socket errors. They lack any values in
# .args on Python 3 which breaks our socket.error block.
raise
Gregory Szorc
global: mass rewrite to use modern exception syntax...
r25660 except socket.error as err:
Matt Mackall
httppeer: use try/except/finally
r25085 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('push failed: %s') % err.args[1])
raise error.Abort(err.args[1])
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 finally:
fp.close()
os.unlink(tempname)
Pierre-Yves David
httppeer: support for _calltwowaystream...
r21074 def _calltwowaystream(self, cmd, fp, **args):
fh = None
Matt Harbison
httppeer: close the temporary bundle file after two-way streaming it...
r23086 fp_ = None
Pierre-Yves David
httppeer: support for _calltwowaystream...
r21074 filename = None
try:
# dump bundle to disk
Yuya Nishihara
py3: wrap tempfile.mkstemp() to use bytes path...
r38182 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
Yuya Nishihara
py3: use r'' instead of sysstr('') to get around code transformer...
r36853 fh = os.fdopen(fd, r"wb")
Pierre-Yves David
httppeer: support for _calltwowaystream...
r21074 d = fp.read(4096)
while d:
fh.write(d)
d = fp.read(4096)
fh.close()
# start http push
Matt Harbison
httppeer: close the temporary bundle file after two-way streaming it...
r23086 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
Augie Fackler
httppeer: headers are native strings...
r36311 headers = {r'Content-Type': r'application/mercurial-0.1'}
Matt Harbison
httppeer: close the temporary bundle file after two-way streaming it...
r23086 return self._callstream(cmd, data=fp_, headers=headers, **args)
Pierre-Yves David
httppeer: support for _calltwowaystream...
r21074 finally:
Matt Harbison
httppeer: close the temporary bundle file after two-way streaming it...
r23086 if fp_ is not None:
fp_.close()
Pierre-Yves David
httppeer: support for _calltwowaystream...
r21074 if fh is not None:
fh.close()
os.unlink(filename)
Pierre-Yves David
wireproto: drop the _decompress method in favor a new call type...
r20905 def _callcompressable(self, cmd, **args):
Gregory Szorc
httppeer: do decompression inside _callstream...
r30464 return self._callstream(cmd, _compressible=True, **args)
Peter Arrenbrecht
peer: introduce real peer classes...
r17192
Mads Kiilerich
httppeer: reintroduce _abort that accidentally was removed in 167047ba3cfa...
r21188 def _abort(self, exception):
raise exception
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
buffersends=True)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 handler = wireprotov2peer.clienthandler(ui, reactor)
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 url = '%s/%s' % (apiurl, permission)
if len(requests) > 1:
url += '/multirequest'
else:
url += '/%s' % requests[0][0]
Gregory Szorc
httppeer: log commands for version 2 peer...
r39467 ui.debug('sending %d commands\n' % len(requests))
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 for command, args, f in requests:
Gregory Szorc
httppeer: log commands for version 2 peer...
r39467 ui.debug('sending command %s: %s\n' % (
command, stringutil.pprint(args, indent=2)))
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 assert not list(handler.callcommand(command, args, f))
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669
# TODO stream this.
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 body = b''.join(map(bytes, handler.flushcommands()))
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669
# TODO modify user-agent to reflect v2
headers = {
r'Accept': wireprotov2server.FRAMINGTYPE,
r'Content-Type': wireprotov2server.FRAMINGTYPE,
}
req = requestbuilder(pycompat.strurl(url), body, headers)
req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
try:
res = opener.open(req)
except urlerr.httperror as e:
if e.code == 401:
raise error.Abort(_('authorization failed'))
raise
except httplib.HTTPException as e:
ui.traceback()
raise IOError(None, e)
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 return handler, res
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669
class queuedcommandfuture(pycompat.futures.Future):
"""Wraps result() on command futures to trigger submission on call."""
def result(self, timeout=None):
if self.done():
return pycompat.futures.Future.result(self, timeout)
self._peerexecutor.sendcommands()
# sendcommands() will restore the original __class__ and self.result
# will resolve to Future.result.
return self.result(timeout)
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 @interfaceutil.implementer(repository.ipeercommandexecutor)
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 class httpv2executor(object):
def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
self._ui = ui
self._opener = opener
self._requestbuilder = requestbuilder
self._apiurl = apiurl
self._descriptor = descriptor
self._sent = False
self._closed = False
self._neededpermissions = set()
self._calls = []
self._futures = weakref.WeakSet()
self._responseexecutor = None
self._responsef = None
def __enter__(self):
return self
def __exit__(self, exctype, excvalue, exctb):
self.close()
def callcommand(self, command, args):
if self._sent:
raise error.ProgrammingError('callcommand() cannot be used after '
'commands are sent')
if self._closed:
raise error.ProgrammingError('callcommand() cannot be used after '
'close()')
# The service advertises which commands are available. So if we attempt
# to call an unknown command or pass an unknown argument, we can screen
# for this.
if command not in self._descriptor['commands']:
raise error.ProgrammingError(
'wire protocol command %s is not available' % command)
cmdinfo = self._descriptor['commands'][command]
unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
if unknownargs:
raise error.ProgrammingError(
'wire protocol command %s does not accept argument: %s' % (
command, ', '.join(sorted(unknownargs))))
self._neededpermissions |= set(cmdinfo['permissions'])
# TODO we /could/ also validate types here, since the API descriptor
# includes types...
f = pycompat.futures.Future()
# Monkeypatch it so result() triggers sendcommands(), otherwise result()
# could deadlock.
f.__class__ = queuedcommandfuture
f._peerexecutor = self
self._futures.add(f)
self._calls.append((command, args, f))
return f
def sendcommands(self):
if self._sent:
return
if not self._calls:
return
self._sent = True
# Unhack any future types so caller sees a clean type and so we
# break reference cycle.
for f in self._futures:
if isinstance(f, queuedcommandfuture):
f.__class__ = pycompat.futures.Future
f._peerexecutor = None
# Mark the future as running and filter out cancelled futures.
calls = [(command, args, f)
for command, args, f in self._calls
if f.set_running_or_notify_cancel()]
# Clear out references, prevent improper object usage.
self._calls = None
if not calls:
return
permissions = set(self._neededpermissions)
if 'push' in permissions and 'pull' in permissions:
permissions.remove('pull')
if len(permissions) > 1:
raise error.RepoError(_('cannot make request requiring multiple '
'permissions: %s') %
_(', ').join(sorted(permissions)))
permission = {
'push': 'rw',
'pull': 'ro',
}[permissions.pop()]
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 handler, resp = sendv2request(
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 self._ui, self._opener, self._requestbuilder, self._apiurl,
permission, calls)
# TODO we probably want to validate the HTTP code, media type, etc.
self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
self._responsef = self._responseexecutor.submit(self._handleresponse,
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 handler, resp)
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669
def close(self):
if self._closed:
return
self.sendcommands()
self._closed = True
if not self._responsef:
return
Gregory Szorc
httppeer: add TODO about delayed handling of ^C...
r39468 # TODO ^C here may not result in immediate program termination.
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 try:
self._responsef.result()
finally:
self._responseexecutor.shutdown(wait=True)
self._responsef = None
self._responseexecutor = None
# If any of our futures are still in progress, mark them as
# errored, otherwise a result() could wait indefinitely.
for f in self._futures:
if not f.done():
f.set_exception(error.ResponseError(
_('unfulfilled command response')))
self._futures = None
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 def _handleresponse(self, handler, resp):
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 # Called in a thread to read the response.
Gregory Szorc
wireprotov2: move response handling out of httppeer...
r37737 while handler.readframe(resp):
pass
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 # TODO implement interface for version 2 peers
Gregory Szorc
interfaceutil: module to stub out zope.interface...
r37828 @interfaceutil.implementer(repository.ipeerconnection,
repository.ipeercapabilities,
repository.ipeerrequests)
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 class httpv2peer(object):
Gregory Szorc
httppeer: support protocol upgrade...
r37576 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
apidescriptor):
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 self.ui = ui
if repourl.endswith('/'):
repourl = repourl[:-1]
Gregory Szorc
httppeer: implement ipeerconnection...
r37627 self._url = repourl
Gregory Szorc
httppeer: support protocol upgrade...
r37576 self._apipath = apipath
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 self._apiurl = '%s/%s' % (repourl, apipath)
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 self._opener = opener
Gregory Szorc
httppeer: support protocol upgrade...
r37576 self._requestbuilder = requestbuilder
self._descriptor = apidescriptor
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501
Gregory Szorc
httppeer: implement ipeerconnection...
r37627 # Start of ipeerconnection.
def url(self):
return self._url
def local(self):
return None
def peer(self):
return self
def canpush(self):
# TODO change once implemented.
return False
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 def close(self):
pass
Gregory Szorc
httppeer: implement ipeerconnection...
r37627 # End of ipeerconnection.
Gregory Szorc
httppeer: basic implementation of capabilities interface...
r37629 # Start of ipeercapabilities.
def capable(self, name):
# The capabilities used internally historically map to capabilities
# advertised from the "capabilities" wire protocol command. However,
# version 2 of that command works differently.
# Maps to commands that are available.
if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
return True
# Other concepts.
if name in ('bundle2',):
return True
return False
def requirecap(self, name, purpose):
if self.capable(name):
return
raise error.CapabilityError(
_('cannot %s; client or remote repository does not support the %r '
'capability') % (purpose, name))
# End of ipeercapabilities.
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501 def _call(self, name, **args):
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 with self.commandexecutor() as e:
return e.callcommand(name, args).result()
Gregory Szorc
wireproto: introduce a reactor for client-side state...
r37561
Gregory Szorc
httppeer: implement command executor for version 2 peer...
r37669 def commandexecutor(self):
return httpv2executor(self.ui, self._opener, self._requestbuilder,
self._apiurl, self._descriptor)
Gregory Szorc
wireproto: crude support for version 2 HTTP peer...
r37501
Gregory Szorc
httppeer: support protocol upgrade...
r37576 # Registry of API service names to metadata about peers that handle it.
#
# The following keys are meaningful:
#
# init
# Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
# apidescriptor) to create a peer.
#
# priority
# Integer priority for the service. If we could choose from multiple
# services, we choose the one with the highest priority.
API_PEERS = {
Gregory Szorc
wireproto: rename HTTPV2 so it less like HTTP/2...
r37662 wireprototypes.HTTP_WIREPROTO_V2: {
Gregory Szorc
httppeer: support protocol upgrade...
r37576 'init': httpv2peer,
'priority': 50,
},
}
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 def performhandshake(ui, url, opener, requestbuilder):
# The handshake is a request to the capabilities command.
caps = None
def capable(x):
raise error.ProgrammingError('should not be called')
Gregory Szorc
httppeer: support protocol upgrade...
r37576 args = {}
# The client advertises support for newer protocols by adding an
# X-HgUpgrade-* header with a list of supported APIs and an
# X-HgProto-* header advertising which serializing formats it supports.
# We only support the HTTP version 2 transport and CBOR responses for
# now.
advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
if advertisev2:
args['headers'] = {
r'X-HgProto-1': r'cbor',
}
args['headers'].update(
encodevalueinheaders(' '.join(sorted(API_PEERS)),
'X-HgUpgrade',
# We don't know the header limit this early.
# So make it small.
1024))
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
capable, url, 'capabilities',
Gregory Szorc
httppeer: support protocol upgrade...
r37576 args)
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 resp = sendrequest(ui, opener, req)
Gregory Szorc
httppeer: detect redirect to URL without query string (issue5860)...
r37851 # The server may redirect us to the repo root, stripping the
# ?cmd=capabilities query string from the URL. The server would likely
# return HTML in this case and ``parsev1commandresponse()`` would raise.
# We catch this special case and re-issue the capabilities request against
# the new URL.
#
# We should ideally not do this, as a redirect that drops the query
# string from the URL is arguably a server bug. (Garbage in, garbage out).
# However, Mercurial clients for several years appeared to handle this
# issue without behavior degradation. And according to issue 5860, it may
# be a longstanding bug in some server implementations. So we allow a
# redirect that drops the query string to "just work."
try:
respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
compressible=False,
allowcbor=advertisev2)
except RedirectedRepoError as e:
req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
capable, e.respurl,
'capabilities', args)
resp = sendrequest(ui, opener, req)
respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
compressible=False,
allowcbor=advertisev2)
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570
try:
Gregory Szorc
httppeer: support protocol upgrade...
r37576 rawdata = resp.read()
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570 finally:
resp.close()
Gregory Szorc
httppeer: support protocol upgrade...
r37576 if not ct.startswith('application/mercurial-'):
raise error.ProgrammingError('unexpected content-type: %s' % ct)
if advertisev2:
if ct == 'application/mercurial-cbor':
try:
Gregory Szorc
httppeer: use our CBOR decoder...
r39476 info = cborutil.decodeall(rawdata)[0]
except cborutil.CBORDecodeError:
Gregory Szorc
httppeer: support protocol upgrade...
r37576 raise error.Abort(_('error decoding CBOR from remote server'),
hint=_('try again and consider contacting '
'the server operator'))
# We got a legacy response. That's fine.
elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
info = {
'v1capabilities': set(rawdata.split())
}
else:
raise error.RepoError(
_('unexpected response type from server: %s') % ct)
else:
info = {
'v1capabilities': set(rawdata.split())
}
return respurl, info
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570
Gregory Szorc
httppeer: allow opener to be passed to makepeer()...
r37571 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
Gregory Szorc
httppeer: move requestbuilder defaults into makepeer() argument...
r37565 """Construct an appropriate HTTP peer instance.
Gregory Szorc
httppeer: allow opener to be passed to makepeer()...
r37571 ``opener`` is an ``url.opener`` that should be used to establish
connections, perform HTTP requests.
Gregory Szorc
httppeer: move requestbuilder defaults into makepeer() argument...
r37565 ``requestbuilder`` is the type used for constructing HTTP requests.
It exists as an argument so extensions can override the default.
"""
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024 u = util.url(path)
if u.query or u.fragment:
raise error.Abort(_('unsupported URL component: "%s"') %
(u.query or u.fragment))
# urllib cannot handle URLs with embedded user or passwd.
url, authinfo = u.authinfo()
ui.debug('using %s\n' % url)
Gregory Szorc
httppeer: allow opener to be passed to makepeer()...
r37571 opener = opener or urlmod.opener(ui, authinfo)
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024
Gregory Szorc
httppeer: support protocol upgrade...
r37576 respurl, info = performhandshake(ui, url, opener, requestbuilder)
# Given the intersection of APIs that both we and the server support,
# sort by their advertised priority and pick the first one.
#
# TODO consider making this request-based and interface driven. For
# example, the caller could say "I want a peer that does X." It's quite
# possible that not all peers would do that. Since we know the service
# capabilities, we could filter out services not meeting the
# requirements. Possibly by consulting the interfaces defined by the
# peer type.
apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
Gregory Szorc
httppeer: perform capabilities request in makepeer()...
r37570
Gregory Szorc
httppeer: support protocol upgrade...
r37576 preferredchoices = sorted(apipeerchoices,
key=lambda x: API_PEERS[x]['priority'],
reverse=True)
for service in preferredchoices:
apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
return API_PEERS[service]['init'](ui, respurl, apipath, opener,
requestbuilder,
info['apis'][service])
# Failed to construct an API peer. Fall back to legacy.
return httppeer(ui, path, respurl, opener, requestbuilder,
info['v1capabilities'])
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024
Gregory Szorc
hg: allow extra arguments to be passed to repo creation (API)...
r39585 def instance(ui, path, create, intents=None, createopts=None):
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 if create:
Pierre-Yves David
error: get Abort from 'error' instead of 'util'...
r26587 raise error.Abort(_('cannot create new http repository'))
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 try:
Gregory Szorc
httppeer: alias url as urlmod...
r36977 if path.startswith('https:') and not urlmod.has_https:
Gregory Szorc
httppeer: remove httpspeer...
r36238 raise error.Abort(_('Python support for SSL and HTTPS '
'is not installed'))
Gregory Szorc
httppeer: remove support for connecting to <0.9.1 servers (BC)...
r35902
Gregory Szorc
httppeer: refactor how httppeer is created (API)...
r37024 inst = makepeer(ui, path)
Gregory Szorc
httppeer: remove support for connecting to <0.9.1 servers (BC)...
r35902
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 return inst
Gregory Szorc
global: mass rewrite to use modern exception syntax...
r25660 except error.RepoError as httpexception:
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 try:
r = statichttprepo.instance(ui, "static-" + path, create)
FUJIWARA Katsunori
httppeer: make a message translatable...
r29241 ui.note(_('(falling back to static-http)\n'))
Peter Arrenbrecht
peer: introduce real peer classes...
r17192 return r
except error.RepoError:
raise httpexception # use the original http RepoError instead