##// END OF EJS Templates
httppeer: no matter what Python 3 might think, http headers are bytes...
Augie Fackler -
r37755:6cb7e3b9 default
parent child Browse files
Show More
@@ -1,951 +1,953
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import io
12 import io
13 import os
13 import os
14 import socket
14 import socket
15 import struct
15 import struct
16 import tempfile
16 import tempfile
17 import weakref
17 import weakref
18
18
19 from .i18n import _
19 from .i18n import _
20 from .thirdparty import (
20 from .thirdparty import (
21 cbor,
21 cbor,
22 )
22 )
23 from .thirdparty.zope import (
23 from .thirdparty.zope import (
24 interface as zi,
24 interface as zi,
25 )
25 )
26 from . import (
26 from . import (
27 bundle2,
27 bundle2,
28 error,
28 error,
29 httpconnection,
29 httpconnection,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 statichttprepo,
32 statichttprepo,
33 url as urlmod,
33 url as urlmod,
34 util,
34 util,
35 wireprotoframing,
35 wireprotoframing,
36 wireprototypes,
36 wireprototypes,
37 wireprotov1peer,
37 wireprotov1peer,
38 wireprotov2peer,
38 wireprotov2peer,
39 wireprotov2server,
39 wireprotov2server,
40 )
40 )
41
41
42 httplib = util.httplib
42 httplib = util.httplib
43 urlerr = util.urlerr
43 urlerr = util.urlerr
44 urlreq = util.urlreq
44 urlreq = util.urlreq
45
45
46 def encodevalueinheaders(value, header, limit):
46 def encodevalueinheaders(value, header, limit):
47 """Encode a string value into multiple HTTP headers.
47 """Encode a string value into multiple HTTP headers.
48
48
49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 name + value will be at most ``limit`` bytes long.
51 name + value will be at most ``limit`` bytes long.
52
52
53 Returns an iterable of 2-tuples consisting of header names and
53 Returns an iterable of 2-tuples consisting of header names and
54 values as native strings.
54 values as native strings.
55 """
55 """
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 # not bytes. This function always takes bytes in as arguments.
57 # not bytes. This function always takes bytes in as arguments.
58 fmt = pycompat.strurl(header) + r'-%s'
58 fmt = pycompat.strurl(header) + r'-%s'
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 # and not a unicode: we're just getting the encoded length anyway,
60 # and not a unicode: we're just getting the encoded length anyway,
61 # and using an r-string to make it portable between Python 2 and 3
61 # and using an r-string to make it portable between Python 2 and 3
62 # doesn't work because then the \r is a literal backslash-r
62 # doesn't work because then the \r is a literal backslash-r
63 # instead of a carriage return.
63 # instead of a carriage return.
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 result = []
65 result = []
66
66
67 n = 0
67 n = 0
68 for i in xrange(0, len(value), valuelen):
68 for i in xrange(0, len(value), valuelen):
69 n += 1
69 n += 1
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71
71
72 return result
72 return result
73
73
74 def _wraphttpresponse(resp):
74 def _wraphttpresponse(resp):
75 """Wrap an HTTPResponse with common error handlers.
75 """Wrap an HTTPResponse with common error handlers.
76
76
77 This ensures that any I/O from any consumer raises the appropriate
77 This ensures that any I/O from any consumer raises the appropriate
78 error and messaging.
78 error and messaging.
79 """
79 """
80 origread = resp.read
80 origread = resp.read
81
81
82 class readerproxy(resp.__class__):
82 class readerproxy(resp.__class__):
83 def read(self, size=None):
83 def read(self, size=None):
84 try:
84 try:
85 return origread(size)
85 return origread(size)
86 except httplib.IncompleteRead as e:
86 except httplib.IncompleteRead as e:
87 # e.expected is an integer if length known or None otherwise.
87 # e.expected is an integer if length known or None otherwise.
88 if e.expected:
88 if e.expected:
89 msg = _('HTTP request error (incomplete response; '
89 msg = _('HTTP request error (incomplete response; '
90 'expected %d bytes got %d)') % (e.expected,
90 'expected %d bytes got %d)') % (e.expected,
91 len(e.partial))
91 len(e.partial))
92 else:
92 else:
93 msg = _('HTTP request error (incomplete response)')
93 msg = _('HTTP request error (incomplete response)')
94
94
95 raise error.PeerTransportError(
95 raise error.PeerTransportError(
96 msg,
96 msg,
97 hint=_('this may be an intermittent network failure; '
97 hint=_('this may be an intermittent network failure; '
98 'if the error persists, consider contacting the '
98 'if the error persists, consider contacting the '
99 'network or server operator'))
99 'network or server operator'))
100 except httplib.HTTPException as e:
100 except httplib.HTTPException as e:
101 raise error.PeerTransportError(
101 raise error.PeerTransportError(
102 _('HTTP request error (%s)') % e,
102 _('HTTP request error (%s)') % e,
103 hint=_('this may be an intermittent network failure; '
103 hint=_('this may be an intermittent network failure; '
104 'if the error persists, consider contacting the '
104 'if the error persists, consider contacting the '
105 'network or server operator'))
105 'network or server operator'))
106
106
107 resp.__class__ = readerproxy
107 resp.__class__ = readerproxy
108
108
109 class _multifile(object):
109 class _multifile(object):
110 def __init__(self, *fileobjs):
110 def __init__(self, *fileobjs):
111 for f in fileobjs:
111 for f in fileobjs:
112 if not util.safehasattr(f, 'length'):
112 if not util.safehasattr(f, 'length'):
113 raise ValueError(
113 raise ValueError(
114 '_multifile only supports file objects that '
114 '_multifile only supports file objects that '
115 'have a length but this one does not:', type(f), f)
115 'have a length but this one does not:', type(f), f)
116 self._fileobjs = fileobjs
116 self._fileobjs = fileobjs
117 self._index = 0
117 self._index = 0
118
118
119 @property
119 @property
120 def length(self):
120 def length(self):
121 return sum(f.length for f in self._fileobjs)
121 return sum(f.length for f in self._fileobjs)
122
122
123 def read(self, amt=None):
123 def read(self, amt=None):
124 if amt <= 0:
124 if amt <= 0:
125 return ''.join(f.read() for f in self._fileobjs)
125 return ''.join(f.read() for f in self._fileobjs)
126 parts = []
126 parts = []
127 while amt and self._index < len(self._fileobjs):
127 while amt and self._index < len(self._fileobjs):
128 parts.append(self._fileobjs[self._index].read(amt))
128 parts.append(self._fileobjs[self._index].read(amt))
129 got = len(parts[-1])
129 got = len(parts[-1])
130 if got < amt:
130 if got < amt:
131 self._index += 1
131 self._index += 1
132 amt -= got
132 amt -= got
133 return ''.join(parts)
133 return ''.join(parts)
134
134
135 def seek(self, offset, whence=os.SEEK_SET):
135 def seek(self, offset, whence=os.SEEK_SET):
136 if whence != os.SEEK_SET:
136 if whence != os.SEEK_SET:
137 raise NotImplementedError(
137 raise NotImplementedError(
138 '_multifile does not support anything other'
138 '_multifile does not support anything other'
139 ' than os.SEEK_SET for whence on seek()')
139 ' than os.SEEK_SET for whence on seek()')
140 if offset != 0:
140 if offset != 0:
141 raise NotImplementedError(
141 raise NotImplementedError(
142 '_multifile only supports seeking to start, but that '
142 '_multifile only supports seeking to start, but that '
143 'could be fixed if you need it')
143 'could be fixed if you need it')
144 for f in self._fileobjs:
144 for f in self._fileobjs:
145 f.seek(0)
145 f.seek(0)
146 self._index = 0
146 self._index = 0
147
147
148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 repobaseurl, cmd, args):
149 repobaseurl, cmd, args):
150 """Make an HTTP request to run a command for a version 1 client.
150 """Make an HTTP request to run a command for a version 1 client.
151
151
152 ``caps`` is a set of known server capabilities. The value may be
152 ``caps`` is a set of known server capabilities. The value may be
153 None if capabilities are not yet known.
153 None if capabilities are not yet known.
154
154
155 ``capablefn`` is a function to evaluate a capability.
155 ``capablefn`` is a function to evaluate a capability.
156
156
157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 raw data to pass to it.
158 raw data to pass to it.
159 """
159 """
160 if cmd == 'pushkey':
160 if cmd == 'pushkey':
161 args['data'] = ''
161 args['data'] = ''
162 data = args.pop('data', None)
162 data = args.pop('data', None)
163 headers = args.pop('headers', {})
163 headers = args.pop('headers', {})
164
164
165 ui.debug("sending %s command\n" % cmd)
165 ui.debug("sending %s command\n" % cmd)
166 q = [('cmd', cmd)]
166 q = [('cmd', cmd)]
167 headersize = 0
167 headersize = 0
168 # Important: don't use self.capable() here or else you end up
168 # Important: don't use self.capable() here or else you end up
169 # with infinite recursion when trying to look up capabilities
169 # with infinite recursion when trying to look up capabilities
170 # for the first time.
170 # for the first time.
171 postargsok = caps is not None and 'httppostargs' in caps
171 postargsok = caps is not None and 'httppostargs' in caps
172
172
173 # Send arguments via POST.
173 # Send arguments via POST.
174 if postargsok and args:
174 if postargsok and args:
175 strargs = urlreq.urlencode(sorted(args.items()))
175 strargs = urlreq.urlencode(sorted(args.items()))
176 if not data:
176 if not data:
177 data = strargs
177 data = strargs
178 else:
178 else:
179 if isinstance(data, bytes):
179 if isinstance(data, bytes):
180 i = io.BytesIO(data)
180 i = io.BytesIO(data)
181 i.length = len(data)
181 i.length = len(data)
182 data = i
182 data = i
183 argsio = io.BytesIO(strargs)
183 argsio = io.BytesIO(strargs)
184 argsio.length = len(strargs)
184 argsio.length = len(strargs)
185 data = _multifile(argsio, data)
185 data = _multifile(argsio, data)
186 headers[r'X-HgArgs-Post'] = len(strargs)
186 headers[r'X-HgArgs-Post'] = len(strargs)
187 elif args:
187 elif args:
188 # Calling self.capable() can infinite loop if we are calling
188 # Calling self.capable() can infinite loop if we are calling
189 # "capabilities". But that command should never accept wire
189 # "capabilities". But that command should never accept wire
190 # protocol arguments. So this should never happen.
190 # protocol arguments. So this should never happen.
191 assert cmd != 'capabilities'
191 assert cmd != 'capabilities'
192 httpheader = capablefn('httpheader')
192 httpheader = capablefn('httpheader')
193 if httpheader:
193 if httpheader:
194 headersize = int(httpheader.split(',', 1)[0])
194 headersize = int(httpheader.split(',', 1)[0])
195
195
196 # Send arguments via HTTP headers.
196 # Send arguments via HTTP headers.
197 if headersize > 0:
197 if headersize > 0:
198 # The headers can typically carry more data than the URL.
198 # The headers can typically carry more data than the URL.
199 encargs = urlreq.urlencode(sorted(args.items()))
199 encargs = urlreq.urlencode(sorted(args.items()))
200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 headersize):
201 headersize):
202 headers[header] = value
202 headers[header] = value
203 # Send arguments via query string (Mercurial <1.9).
203 # Send arguments via query string (Mercurial <1.9).
204 else:
204 else:
205 q += sorted(args.items())
205 q += sorted(args.items())
206
206
207 qs = '?%s' % urlreq.urlencode(q)
207 qs = '?%s' % urlreq.urlencode(q)
208 cu = "%s%s" % (repobaseurl, qs)
208 cu = "%s%s" % (repobaseurl, qs)
209 size = 0
209 size = 0
210 if util.safehasattr(data, 'length'):
210 if util.safehasattr(data, 'length'):
211 size = data.length
211 size = data.length
212 elif data is not None:
212 elif data is not None:
213 size = len(data)
213 size = len(data)
214 if data is not None and r'Content-Type' not in headers:
214 if data is not None and r'Content-Type' not in headers:
215 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216
216
217 # Tell the server we accept application/mercurial-0.2 and multiple
217 # Tell the server we accept application/mercurial-0.2 and multiple
218 # compression formats if the server is capable of emitting those
218 # compression formats if the server is capable of emitting those
219 # payloads.
219 # payloads.
220 # Note: Keep this set empty by default, as client advertisement of
220 # Note: Keep this set empty by default, as client advertisement of
221 # protocol parameters should only occur after the handshake.
221 # protocol parameters should only occur after the handshake.
222 protoparams = set()
222 protoparams = set()
223
223
224 mediatypes = set()
224 mediatypes = set()
225 if caps is not None:
225 if caps is not None:
226 mt = capablefn('httpmediatype')
226 mt = capablefn('httpmediatype')
227 if mt:
227 if mt:
228 protoparams.add('0.1')
228 protoparams.add('0.1')
229 mediatypes = set(mt.split(','))
229 mediatypes = set(mt.split(','))
230
230
231 protoparams.add('partial-pull')
231 protoparams.add('partial-pull')
232
232
233 if '0.2tx' in mediatypes:
233 if '0.2tx' in mediatypes:
234 protoparams.add('0.2')
234 protoparams.add('0.2')
235
235
236 if '0.2tx' in mediatypes and capablefn('compression'):
236 if '0.2tx' in mediatypes and capablefn('compression'):
237 # We /could/ compare supported compression formats and prune
237 # We /could/ compare supported compression formats and prune
238 # non-mutually supported or error if nothing is mutually supported.
238 # non-mutually supported or error if nothing is mutually supported.
239 # For now, send the full list to the server and have it error.
239 # For now, send the full list to the server and have it error.
240 comps = [e.wireprotosupport().name for e in
240 comps = [e.wireprotosupport().name for e in
241 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 protoparams.add('comp=%s' % ','.join(comps))
242 protoparams.add('comp=%s' % ','.join(comps))
243
243
244 if protoparams:
244 if protoparams:
245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 'X-HgProto',
246 'X-HgProto',
247 headersize or 1024)
247 headersize or 1024)
248 for header, value in protoheaders:
248 for header, value in protoheaders:
249 headers[header] = value
249 headers[header] = value
250
250
251 varyheaders = []
251 varyheaders = []
252 for header in headers:
252 for header in headers:
253 if header.lower().startswith(r'x-hg'):
253 if header.lower().startswith(r'x-hg'):
254 varyheaders.append(header)
254 varyheaders.append(header)
255
255
256 if varyheaders:
256 if varyheaders:
257 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258
258
259 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260
260
261 if data is not None:
261 if data is not None:
262 ui.debug("sending %d bytes\n" % size)
262 ui.debug("sending %d bytes\n" % size)
263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264
264
265 return req, cu, qs
265 return req, cu, qs
266
266
267 def sendrequest(ui, opener, req):
267 def sendrequest(ui, opener, req):
268 """Send a prepared HTTP request.
268 """Send a prepared HTTP request.
269
269
270 Returns the response object.
270 Returns the response object.
271 """
271 """
272 if (ui.debugflag
272 if (ui.debugflag
273 and ui.configbool('devel', 'debug.peer-request')):
273 and ui.configbool('devel', 'debug.peer-request')):
274 dbg = ui.debug
274 dbg = ui.debug
275 line = 'devel-peer-request: %s\n'
275 line = 'devel-peer-request: %s\n'
276 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
276 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
277 pycompat.bytesurl(req.get_full_url())))
277 pycompat.bytesurl(req.get_full_url())))
278 hgargssize = None
278 hgargssize = None
279
279
280 for header, value in sorted(req.header_items()):
280 for header, value in sorted(req.header_items()):
281 header = pycompat.bytesurl(header)
282 value = pycompat.bytesurl(value)
281 if header.startswith('X-hgarg-'):
283 if header.startswith('X-hgarg-'):
282 if hgargssize is None:
284 if hgargssize is None:
283 hgargssize = 0
285 hgargssize = 0
284 hgargssize += len(value)
286 hgargssize += len(value)
285 else:
287 else:
286 dbg(line % ' %s %s' % (header, value))
288 dbg(line % ' %s %s' % (header, value))
287
289
288 if hgargssize is not None:
290 if hgargssize is not None:
289 dbg(line % ' %d bytes of commands arguments in headers'
291 dbg(line % ' %d bytes of commands arguments in headers'
290 % hgargssize)
292 % hgargssize)
291
293
292 if req.has_data():
294 if req.has_data():
293 data = req.get_data()
295 data = req.get_data()
294 length = getattr(data, 'length', None)
296 length = getattr(data, 'length', None)
295 if length is None:
297 if length is None:
296 length = len(data)
298 length = len(data)
297 dbg(line % ' %d bytes of data' % length)
299 dbg(line % ' %d bytes of data' % length)
298
300
299 start = util.timer()
301 start = util.timer()
300
302
301 try:
303 try:
302 res = opener.open(req)
304 res = opener.open(req)
303 except urlerr.httperror as inst:
305 except urlerr.httperror as inst:
304 if inst.code == 401:
306 if inst.code == 401:
305 raise error.Abort(_('authorization failed'))
307 raise error.Abort(_('authorization failed'))
306 raise
308 raise
307 except httplib.HTTPException as inst:
309 except httplib.HTTPException as inst:
308 ui.debug('http error requesting %s\n' %
310 ui.debug('http error requesting %s\n' %
309 util.hidepassword(req.get_full_url()))
311 util.hidepassword(req.get_full_url()))
310 ui.traceback()
312 ui.traceback()
311 raise IOError(None, inst)
313 raise IOError(None, inst)
312 finally:
314 finally:
313 if ui.configbool('devel', 'debug.peer-request'):
315 if ui.configbool('devel', 'debug.peer-request'):
314 dbg(line % ' finished in %.4f seconds (%d)'
316 dbg(line % ' finished in %.4f seconds (%d)'
315 % (util.timer() - start, res.code))
317 % (util.timer() - start, res.code))
316
318
317 # Insert error handlers for common I/O failures.
319 # Insert error handlers for common I/O failures.
318 _wraphttpresponse(res)
320 _wraphttpresponse(res)
319
321
320 return res
322 return res
321
323
322 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
324 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
323 allowcbor=False):
325 allowcbor=False):
324 # record the url we got redirected to
326 # record the url we got redirected to
325 respurl = pycompat.bytesurl(resp.geturl())
327 respurl = pycompat.bytesurl(resp.geturl())
326 if respurl.endswith(qs):
328 if respurl.endswith(qs):
327 respurl = respurl[:-len(qs)]
329 respurl = respurl[:-len(qs)]
328 if baseurl.rstrip('/') != respurl.rstrip('/'):
330 if baseurl.rstrip('/') != respurl.rstrip('/'):
329 if not ui.quiet:
331 if not ui.quiet:
330 ui.warn(_('real URL is %s\n') % respurl)
332 ui.warn(_('real URL is %s\n') % respurl)
331
333
332 try:
334 try:
333 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
335 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
334 except AttributeError:
336 except AttributeError:
335 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
337 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
336
338
337 safeurl = util.hidepassword(baseurl)
339 safeurl = util.hidepassword(baseurl)
338 if proto.startswith('application/hg-error'):
340 if proto.startswith('application/hg-error'):
339 raise error.OutOfBandError(resp.read())
341 raise error.OutOfBandError(resp.read())
340
342
341 # Pre 1.0 versions of Mercurial used text/plain and
343 # Pre 1.0 versions of Mercurial used text/plain and
342 # application/hg-changegroup. We don't support such old servers.
344 # application/hg-changegroup. We don't support such old servers.
343 if not proto.startswith('application/mercurial-'):
345 if not proto.startswith('application/mercurial-'):
344 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
346 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
345 raise error.RepoError(
347 raise error.RepoError(
346 _("'%s' does not appear to be an hg repository:\n"
348 _("'%s' does not appear to be an hg repository:\n"
347 "---%%<--- (%s)\n%s\n---%%<---\n")
349 "---%%<--- (%s)\n%s\n---%%<---\n")
348 % (safeurl, proto or 'no content-type', resp.read(1024)))
350 % (safeurl, proto or 'no content-type', resp.read(1024)))
349
351
350 try:
352 try:
351 subtype = proto.split('-', 1)[1]
353 subtype = proto.split('-', 1)[1]
352
354
353 # Unless we end up supporting CBOR in the legacy wire protocol,
355 # Unless we end up supporting CBOR in the legacy wire protocol,
354 # this should ONLY be encountered for the initial capabilities
356 # this should ONLY be encountered for the initial capabilities
355 # request during handshake.
357 # request during handshake.
356 if subtype == 'cbor':
358 if subtype == 'cbor':
357 if allowcbor:
359 if allowcbor:
358 return respurl, proto, resp
360 return respurl, proto, resp
359 else:
361 else:
360 raise error.RepoError(_('unexpected CBOR response from '
362 raise error.RepoError(_('unexpected CBOR response from '
361 'server'))
363 'server'))
362
364
363 version_info = tuple([int(n) for n in subtype.split('.')])
365 version_info = tuple([int(n) for n in subtype.split('.')])
364 except ValueError:
366 except ValueError:
365 raise error.RepoError(_("'%s' sent a broken Content-Type "
367 raise error.RepoError(_("'%s' sent a broken Content-Type "
366 "header (%s)") % (safeurl, proto))
368 "header (%s)") % (safeurl, proto))
367
369
368 # TODO consider switching to a decompression reader that uses
370 # TODO consider switching to a decompression reader that uses
369 # generators.
371 # generators.
370 if version_info == (0, 1):
372 if version_info == (0, 1):
371 if compressible:
373 if compressible:
372 resp = util.compengines['zlib'].decompressorreader(resp)
374 resp = util.compengines['zlib'].decompressorreader(resp)
373
375
374 elif version_info == (0, 2):
376 elif version_info == (0, 2):
375 # application/mercurial-0.2 always identifies the compression
377 # application/mercurial-0.2 always identifies the compression
376 # engine in the payload header.
378 # engine in the payload header.
377 elen = struct.unpack('B', resp.read(1))[0]
379 elen = struct.unpack('B', resp.read(1))[0]
378 ename = resp.read(elen)
380 ename = resp.read(elen)
379 engine = util.compengines.forwiretype(ename)
381 engine = util.compengines.forwiretype(ename)
380
382
381 resp = engine.decompressorreader(resp)
383 resp = engine.decompressorreader(resp)
382 else:
384 else:
383 raise error.RepoError(_("'%s' uses newer protocol %s") %
385 raise error.RepoError(_("'%s' uses newer protocol %s") %
384 (safeurl, subtype))
386 (safeurl, subtype))
385
387
386 return respurl, proto, resp
388 return respurl, proto, resp
387
389
388 class httppeer(wireprotov1peer.wirepeer):
390 class httppeer(wireprotov1peer.wirepeer):
389 def __init__(self, ui, path, url, opener, requestbuilder, caps):
391 def __init__(self, ui, path, url, opener, requestbuilder, caps):
390 self.ui = ui
392 self.ui = ui
391 self._path = path
393 self._path = path
392 self._url = url
394 self._url = url
393 self._caps = caps
395 self._caps = caps
394 self._urlopener = opener
396 self._urlopener = opener
395 self._requestbuilder = requestbuilder
397 self._requestbuilder = requestbuilder
396
398
397 def __del__(self):
399 def __del__(self):
398 for h in self._urlopener.handlers:
400 for h in self._urlopener.handlers:
399 h.close()
401 h.close()
400 getattr(h, "close_all", lambda: None)()
402 getattr(h, "close_all", lambda: None)()
401
403
402 # Begin of ipeerconnection interface.
404 # Begin of ipeerconnection interface.
403
405
404 def url(self):
406 def url(self):
405 return self._path
407 return self._path
406
408
407 def local(self):
409 def local(self):
408 return None
410 return None
409
411
410 def peer(self):
412 def peer(self):
411 return self
413 return self
412
414
413 def canpush(self):
415 def canpush(self):
414 return True
416 return True
415
417
416 def close(self):
418 def close(self):
417 pass
419 pass
418
420
419 # End of ipeerconnection interface.
421 # End of ipeerconnection interface.
420
422
421 # Begin of ipeercommands interface.
423 # Begin of ipeercommands interface.
422
424
423 def capabilities(self):
425 def capabilities(self):
424 return self._caps
426 return self._caps
425
427
426 # End of ipeercommands interface.
428 # End of ipeercommands interface.
427
429
428 # look up capabilities only when needed
430 # look up capabilities only when needed
429
431
430 def _callstream(self, cmd, _compressible=False, **args):
432 def _callstream(self, cmd, _compressible=False, **args):
431 args = pycompat.byteskwargs(args)
433 args = pycompat.byteskwargs(args)
432
434
433 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
435 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
434 self._caps, self.capable,
436 self._caps, self.capable,
435 self._url, cmd, args)
437 self._url, cmd, args)
436
438
437 resp = sendrequest(self.ui, self._urlopener, req)
439 resp = sendrequest(self.ui, self._urlopener, req)
438
440
439 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
441 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
440 resp, _compressible)
442 resp, _compressible)
441
443
442 return resp
444 return resp
443
445
444 def _call(self, cmd, **args):
446 def _call(self, cmd, **args):
445 fp = self._callstream(cmd, **args)
447 fp = self._callstream(cmd, **args)
446 try:
448 try:
447 return fp.read()
449 return fp.read()
448 finally:
450 finally:
449 # if using keepalive, allow connection to be reused
451 # if using keepalive, allow connection to be reused
450 fp.close()
452 fp.close()
451
453
452 def _callpush(self, cmd, cg, **args):
454 def _callpush(self, cmd, cg, **args):
453 # have to stream bundle to a temp file because we do not have
455 # have to stream bundle to a temp file because we do not have
454 # http 1.1 chunked transfer.
456 # http 1.1 chunked transfer.
455
457
456 types = self.capable('unbundle')
458 types = self.capable('unbundle')
457 try:
459 try:
458 types = types.split(',')
460 types = types.split(',')
459 except AttributeError:
461 except AttributeError:
460 # servers older than d1b16a746db6 will send 'unbundle' as a
462 # servers older than d1b16a746db6 will send 'unbundle' as a
461 # boolean capability. They only support headerless/uncompressed
463 # boolean capability. They only support headerless/uncompressed
462 # bundles.
464 # bundles.
463 types = [""]
465 types = [""]
464 for x in types:
466 for x in types:
465 if x in bundle2.bundletypes:
467 if x in bundle2.bundletypes:
466 type = x
468 type = x
467 break
469 break
468
470
469 tempname = bundle2.writebundle(self.ui, cg, None, type)
471 tempname = bundle2.writebundle(self.ui, cg, None, type)
470 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
472 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
471 headers = {r'Content-Type': r'application/mercurial-0.1'}
473 headers = {r'Content-Type': r'application/mercurial-0.1'}
472
474
473 try:
475 try:
474 r = self._call(cmd, data=fp, headers=headers, **args)
476 r = self._call(cmd, data=fp, headers=headers, **args)
475 vals = r.split('\n', 1)
477 vals = r.split('\n', 1)
476 if len(vals) < 2:
478 if len(vals) < 2:
477 raise error.ResponseError(_("unexpected response:"), r)
479 raise error.ResponseError(_("unexpected response:"), r)
478 return vals
480 return vals
479 except urlerr.httperror:
481 except urlerr.httperror:
480 # Catch and re-raise these so we don't try and treat them
482 # Catch and re-raise these so we don't try and treat them
481 # like generic socket errors. They lack any values in
483 # like generic socket errors. They lack any values in
482 # .args on Python 3 which breaks our socket.error block.
484 # .args on Python 3 which breaks our socket.error block.
483 raise
485 raise
484 except socket.error as err:
486 except socket.error as err:
485 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
487 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
486 raise error.Abort(_('push failed: %s') % err.args[1])
488 raise error.Abort(_('push failed: %s') % err.args[1])
487 raise error.Abort(err.args[1])
489 raise error.Abort(err.args[1])
488 finally:
490 finally:
489 fp.close()
491 fp.close()
490 os.unlink(tempname)
492 os.unlink(tempname)
491
493
492 def _calltwowaystream(self, cmd, fp, **args):
494 def _calltwowaystream(self, cmd, fp, **args):
493 fh = None
495 fh = None
494 fp_ = None
496 fp_ = None
495 filename = None
497 filename = None
496 try:
498 try:
497 # dump bundle to disk
499 # dump bundle to disk
498 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
500 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
499 fh = os.fdopen(fd, r"wb")
501 fh = os.fdopen(fd, r"wb")
500 d = fp.read(4096)
502 d = fp.read(4096)
501 while d:
503 while d:
502 fh.write(d)
504 fh.write(d)
503 d = fp.read(4096)
505 d = fp.read(4096)
504 fh.close()
506 fh.close()
505 # start http push
507 # start http push
506 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
508 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
507 headers = {r'Content-Type': r'application/mercurial-0.1'}
509 headers = {r'Content-Type': r'application/mercurial-0.1'}
508 return self._callstream(cmd, data=fp_, headers=headers, **args)
510 return self._callstream(cmd, data=fp_, headers=headers, **args)
509 finally:
511 finally:
510 if fp_ is not None:
512 if fp_ is not None:
511 fp_.close()
513 fp_.close()
512 if fh is not None:
514 if fh is not None:
513 fh.close()
515 fh.close()
514 os.unlink(filename)
516 os.unlink(filename)
515
517
516 def _callcompressable(self, cmd, **args):
518 def _callcompressable(self, cmd, **args):
517 return self._callstream(cmd, _compressible=True, **args)
519 return self._callstream(cmd, _compressible=True, **args)
518
520
519 def _abort(self, exception):
521 def _abort(self, exception):
520 raise exception
522 raise exception
521
523
522 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
524 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
523 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
525 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
524 buffersends=True)
526 buffersends=True)
525
527
526 handler = wireprotov2peer.clienthandler(ui, reactor)
528 handler = wireprotov2peer.clienthandler(ui, reactor)
527
529
528 url = '%s/%s' % (apiurl, permission)
530 url = '%s/%s' % (apiurl, permission)
529
531
530 if len(requests) > 1:
532 if len(requests) > 1:
531 url += '/multirequest'
533 url += '/multirequest'
532 else:
534 else:
533 url += '/%s' % requests[0][0]
535 url += '/%s' % requests[0][0]
534
536
535 for command, args, f in requests:
537 for command, args, f in requests:
536 assert not list(handler.callcommand(command, args, f))
538 assert not list(handler.callcommand(command, args, f))
537
539
538 # TODO stream this.
540 # TODO stream this.
539 body = b''.join(map(bytes, handler.flushcommands()))
541 body = b''.join(map(bytes, handler.flushcommands()))
540
542
541 # TODO modify user-agent to reflect v2
543 # TODO modify user-agent to reflect v2
542 headers = {
544 headers = {
543 r'Accept': wireprotov2server.FRAMINGTYPE,
545 r'Accept': wireprotov2server.FRAMINGTYPE,
544 r'Content-Type': wireprotov2server.FRAMINGTYPE,
546 r'Content-Type': wireprotov2server.FRAMINGTYPE,
545 }
547 }
546
548
547 req = requestbuilder(pycompat.strurl(url), body, headers)
549 req = requestbuilder(pycompat.strurl(url), body, headers)
548 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
550 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
549
551
550 try:
552 try:
551 res = opener.open(req)
553 res = opener.open(req)
552 except urlerr.httperror as e:
554 except urlerr.httperror as e:
553 if e.code == 401:
555 if e.code == 401:
554 raise error.Abort(_('authorization failed'))
556 raise error.Abort(_('authorization failed'))
555
557
556 raise
558 raise
557 except httplib.HTTPException as e:
559 except httplib.HTTPException as e:
558 ui.traceback()
560 ui.traceback()
559 raise IOError(None, e)
561 raise IOError(None, e)
560
562
561 return handler, res
563 return handler, res
562
564
563 class queuedcommandfuture(pycompat.futures.Future):
565 class queuedcommandfuture(pycompat.futures.Future):
564 """Wraps result() on command futures to trigger submission on call."""
566 """Wraps result() on command futures to trigger submission on call."""
565
567
566 def result(self, timeout=None):
568 def result(self, timeout=None):
567 if self.done():
569 if self.done():
568 return pycompat.futures.Future.result(self, timeout)
570 return pycompat.futures.Future.result(self, timeout)
569
571
570 self._peerexecutor.sendcommands()
572 self._peerexecutor.sendcommands()
571
573
572 # sendcommands() will restore the original __class__ and self.result
574 # sendcommands() will restore the original __class__ and self.result
573 # will resolve to Future.result.
575 # will resolve to Future.result.
574 return self.result(timeout)
576 return self.result(timeout)
575
577
576 @zi.implementer(repository.ipeercommandexecutor)
578 @zi.implementer(repository.ipeercommandexecutor)
577 class httpv2executor(object):
579 class httpv2executor(object):
578 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
580 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
579 self._ui = ui
581 self._ui = ui
580 self._opener = opener
582 self._opener = opener
581 self._requestbuilder = requestbuilder
583 self._requestbuilder = requestbuilder
582 self._apiurl = apiurl
584 self._apiurl = apiurl
583 self._descriptor = descriptor
585 self._descriptor = descriptor
584 self._sent = False
586 self._sent = False
585 self._closed = False
587 self._closed = False
586 self._neededpermissions = set()
588 self._neededpermissions = set()
587 self._calls = []
589 self._calls = []
588 self._futures = weakref.WeakSet()
590 self._futures = weakref.WeakSet()
589 self._responseexecutor = None
591 self._responseexecutor = None
590 self._responsef = None
592 self._responsef = None
591
593
592 def __enter__(self):
594 def __enter__(self):
593 return self
595 return self
594
596
595 def __exit__(self, exctype, excvalue, exctb):
597 def __exit__(self, exctype, excvalue, exctb):
596 self.close()
598 self.close()
597
599
598 def callcommand(self, command, args):
600 def callcommand(self, command, args):
599 if self._sent:
601 if self._sent:
600 raise error.ProgrammingError('callcommand() cannot be used after '
602 raise error.ProgrammingError('callcommand() cannot be used after '
601 'commands are sent')
603 'commands are sent')
602
604
603 if self._closed:
605 if self._closed:
604 raise error.ProgrammingError('callcommand() cannot be used after '
606 raise error.ProgrammingError('callcommand() cannot be used after '
605 'close()')
607 'close()')
606
608
607 # The service advertises which commands are available. So if we attempt
609 # The service advertises which commands are available. So if we attempt
608 # to call an unknown command or pass an unknown argument, we can screen
610 # to call an unknown command or pass an unknown argument, we can screen
609 # for this.
611 # for this.
610 if command not in self._descriptor['commands']:
612 if command not in self._descriptor['commands']:
611 raise error.ProgrammingError(
613 raise error.ProgrammingError(
612 'wire protocol command %s is not available' % command)
614 'wire protocol command %s is not available' % command)
613
615
614 cmdinfo = self._descriptor['commands'][command]
616 cmdinfo = self._descriptor['commands'][command]
615 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
617 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
616
618
617 if unknownargs:
619 if unknownargs:
618 raise error.ProgrammingError(
620 raise error.ProgrammingError(
619 'wire protocol command %s does not accept argument: %s' % (
621 'wire protocol command %s does not accept argument: %s' % (
620 command, ', '.join(sorted(unknownargs))))
622 command, ', '.join(sorted(unknownargs))))
621
623
622 self._neededpermissions |= set(cmdinfo['permissions'])
624 self._neededpermissions |= set(cmdinfo['permissions'])
623
625
624 # TODO we /could/ also validate types here, since the API descriptor
626 # TODO we /could/ also validate types here, since the API descriptor
625 # includes types...
627 # includes types...
626
628
627 f = pycompat.futures.Future()
629 f = pycompat.futures.Future()
628
630
629 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
631 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
630 # could deadlock.
632 # could deadlock.
631 f.__class__ = queuedcommandfuture
633 f.__class__ = queuedcommandfuture
632 f._peerexecutor = self
634 f._peerexecutor = self
633
635
634 self._futures.add(f)
636 self._futures.add(f)
635 self._calls.append((command, args, f))
637 self._calls.append((command, args, f))
636
638
637 return f
639 return f
638
640
639 def sendcommands(self):
641 def sendcommands(self):
640 if self._sent:
642 if self._sent:
641 return
643 return
642
644
643 if not self._calls:
645 if not self._calls:
644 return
646 return
645
647
646 self._sent = True
648 self._sent = True
647
649
648 # Unhack any future types so caller sees a clean type and so we
650 # Unhack any future types so caller sees a clean type and so we
649 # break reference cycle.
651 # break reference cycle.
650 for f in self._futures:
652 for f in self._futures:
651 if isinstance(f, queuedcommandfuture):
653 if isinstance(f, queuedcommandfuture):
652 f.__class__ = pycompat.futures.Future
654 f.__class__ = pycompat.futures.Future
653 f._peerexecutor = None
655 f._peerexecutor = None
654
656
655 # Mark the future as running and filter out cancelled futures.
657 # Mark the future as running and filter out cancelled futures.
656 calls = [(command, args, f)
658 calls = [(command, args, f)
657 for command, args, f in self._calls
659 for command, args, f in self._calls
658 if f.set_running_or_notify_cancel()]
660 if f.set_running_or_notify_cancel()]
659
661
660 # Clear out references, prevent improper object usage.
662 # Clear out references, prevent improper object usage.
661 self._calls = None
663 self._calls = None
662
664
663 if not calls:
665 if not calls:
664 return
666 return
665
667
666 permissions = set(self._neededpermissions)
668 permissions = set(self._neededpermissions)
667
669
668 if 'push' in permissions and 'pull' in permissions:
670 if 'push' in permissions and 'pull' in permissions:
669 permissions.remove('pull')
671 permissions.remove('pull')
670
672
671 if len(permissions) > 1:
673 if len(permissions) > 1:
672 raise error.RepoError(_('cannot make request requiring multiple '
674 raise error.RepoError(_('cannot make request requiring multiple '
673 'permissions: %s') %
675 'permissions: %s') %
674 _(', ').join(sorted(permissions)))
676 _(', ').join(sorted(permissions)))
675
677
676 permission = {
678 permission = {
677 'push': 'rw',
679 'push': 'rw',
678 'pull': 'ro',
680 'pull': 'ro',
679 }[permissions.pop()]
681 }[permissions.pop()]
680
682
681 handler, resp = sendv2request(
683 handler, resp = sendv2request(
682 self._ui, self._opener, self._requestbuilder, self._apiurl,
684 self._ui, self._opener, self._requestbuilder, self._apiurl,
683 permission, calls)
685 permission, calls)
684
686
685 # TODO we probably want to validate the HTTP code, media type, etc.
687 # TODO we probably want to validate the HTTP code, media type, etc.
686
688
687 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
689 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
688 self._responsef = self._responseexecutor.submit(self._handleresponse,
690 self._responsef = self._responseexecutor.submit(self._handleresponse,
689 handler, resp)
691 handler, resp)
690
692
691 def close(self):
693 def close(self):
692 if self._closed:
694 if self._closed:
693 return
695 return
694
696
695 self.sendcommands()
697 self.sendcommands()
696
698
697 self._closed = True
699 self._closed = True
698
700
699 if not self._responsef:
701 if not self._responsef:
700 return
702 return
701
703
702 try:
704 try:
703 self._responsef.result()
705 self._responsef.result()
704 finally:
706 finally:
705 self._responseexecutor.shutdown(wait=True)
707 self._responseexecutor.shutdown(wait=True)
706 self._responsef = None
708 self._responsef = None
707 self._responseexecutor = None
709 self._responseexecutor = None
708
710
709 # If any of our futures are still in progress, mark them as
711 # If any of our futures are still in progress, mark them as
710 # errored, otherwise a result() could wait indefinitely.
712 # errored, otherwise a result() could wait indefinitely.
711 for f in self._futures:
713 for f in self._futures:
712 if not f.done():
714 if not f.done():
713 f.set_exception(error.ResponseError(
715 f.set_exception(error.ResponseError(
714 _('unfulfilled command response')))
716 _('unfulfilled command response')))
715
717
716 self._futures = None
718 self._futures = None
717
719
718 def _handleresponse(self, handler, resp):
720 def _handleresponse(self, handler, resp):
719 # Called in a thread to read the response.
721 # Called in a thread to read the response.
720
722
721 while handler.readframe(resp):
723 while handler.readframe(resp):
722 pass
724 pass
723
725
724 # TODO implement interface for version 2 peers
726 # TODO implement interface for version 2 peers
725 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
727 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
726 repository.ipeerrequests)
728 repository.ipeerrequests)
727 class httpv2peer(object):
729 class httpv2peer(object):
728 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
730 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
729 apidescriptor):
731 apidescriptor):
730 self.ui = ui
732 self.ui = ui
731
733
732 if repourl.endswith('/'):
734 if repourl.endswith('/'):
733 repourl = repourl[:-1]
735 repourl = repourl[:-1]
734
736
735 self._url = repourl
737 self._url = repourl
736 self._apipath = apipath
738 self._apipath = apipath
737 self._apiurl = '%s/%s' % (repourl, apipath)
739 self._apiurl = '%s/%s' % (repourl, apipath)
738 self._opener = opener
740 self._opener = opener
739 self._requestbuilder = requestbuilder
741 self._requestbuilder = requestbuilder
740 self._descriptor = apidescriptor
742 self._descriptor = apidescriptor
741
743
742 # Start of ipeerconnection.
744 # Start of ipeerconnection.
743
745
744 def url(self):
746 def url(self):
745 return self._url
747 return self._url
746
748
747 def local(self):
749 def local(self):
748 return None
750 return None
749
751
750 def peer(self):
752 def peer(self):
751 return self
753 return self
752
754
753 def canpush(self):
755 def canpush(self):
754 # TODO change once implemented.
756 # TODO change once implemented.
755 return False
757 return False
756
758
757 def close(self):
759 def close(self):
758 pass
760 pass
759
761
760 # End of ipeerconnection.
762 # End of ipeerconnection.
761
763
762 # Start of ipeercapabilities.
764 # Start of ipeercapabilities.
763
765
764 def capable(self, name):
766 def capable(self, name):
765 # The capabilities used internally historically map to capabilities
767 # The capabilities used internally historically map to capabilities
766 # advertised from the "capabilities" wire protocol command. However,
768 # advertised from the "capabilities" wire protocol command. However,
767 # version 2 of that command works differently.
769 # version 2 of that command works differently.
768
770
769 # Maps to commands that are available.
771 # Maps to commands that are available.
770 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
772 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
771 return True
773 return True
772
774
773 # Other concepts.
775 # Other concepts.
774 if name in ('bundle2',):
776 if name in ('bundle2',):
775 return True
777 return True
776
778
777 return False
779 return False
778
780
779 def requirecap(self, name, purpose):
781 def requirecap(self, name, purpose):
780 if self.capable(name):
782 if self.capable(name):
781 return
783 return
782
784
783 raise error.CapabilityError(
785 raise error.CapabilityError(
784 _('cannot %s; client or remote repository does not support the %r '
786 _('cannot %s; client or remote repository does not support the %r '
785 'capability') % (purpose, name))
787 'capability') % (purpose, name))
786
788
787 # End of ipeercapabilities.
789 # End of ipeercapabilities.
788
790
789 def _call(self, name, **args):
791 def _call(self, name, **args):
790 with self.commandexecutor() as e:
792 with self.commandexecutor() as e:
791 return e.callcommand(name, args).result()
793 return e.callcommand(name, args).result()
792
794
793 def commandexecutor(self):
795 def commandexecutor(self):
794 return httpv2executor(self.ui, self._opener, self._requestbuilder,
796 return httpv2executor(self.ui, self._opener, self._requestbuilder,
795 self._apiurl, self._descriptor)
797 self._apiurl, self._descriptor)
796
798
797 # Registry of API service names to metadata about peers that handle it.
799 # Registry of API service names to metadata about peers that handle it.
798 #
800 #
799 # The following keys are meaningful:
801 # The following keys are meaningful:
800 #
802 #
801 # init
803 # init
802 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
804 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
803 # apidescriptor) to create a peer.
805 # apidescriptor) to create a peer.
804 #
806 #
805 # priority
807 # priority
806 # Integer priority for the service. If we could choose from multiple
808 # Integer priority for the service. If we could choose from multiple
807 # services, we choose the one with the highest priority.
809 # services, we choose the one with the highest priority.
808 API_PEERS = {
810 API_PEERS = {
809 wireprototypes.HTTP_WIREPROTO_V2: {
811 wireprototypes.HTTP_WIREPROTO_V2: {
810 'init': httpv2peer,
812 'init': httpv2peer,
811 'priority': 50,
813 'priority': 50,
812 },
814 },
813 }
815 }
814
816
815 def performhandshake(ui, url, opener, requestbuilder):
817 def performhandshake(ui, url, opener, requestbuilder):
816 # The handshake is a request to the capabilities command.
818 # The handshake is a request to the capabilities command.
817
819
818 caps = None
820 caps = None
819 def capable(x):
821 def capable(x):
820 raise error.ProgrammingError('should not be called')
822 raise error.ProgrammingError('should not be called')
821
823
822 args = {}
824 args = {}
823
825
824 # The client advertises support for newer protocols by adding an
826 # The client advertises support for newer protocols by adding an
825 # X-HgUpgrade-* header with a list of supported APIs and an
827 # X-HgUpgrade-* header with a list of supported APIs and an
826 # X-HgProto-* header advertising which serializing formats it supports.
828 # X-HgProto-* header advertising which serializing formats it supports.
827 # We only support the HTTP version 2 transport and CBOR responses for
829 # We only support the HTTP version 2 transport and CBOR responses for
828 # now.
830 # now.
829 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
831 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
830
832
831 if advertisev2:
833 if advertisev2:
832 args['headers'] = {
834 args['headers'] = {
833 r'X-HgProto-1': r'cbor',
835 r'X-HgProto-1': r'cbor',
834 }
836 }
835
837
836 args['headers'].update(
838 args['headers'].update(
837 encodevalueinheaders(' '.join(sorted(API_PEERS)),
839 encodevalueinheaders(' '.join(sorted(API_PEERS)),
838 'X-HgUpgrade',
840 'X-HgUpgrade',
839 # We don't know the header limit this early.
841 # We don't know the header limit this early.
840 # So make it small.
842 # So make it small.
841 1024))
843 1024))
842
844
843 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
845 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
844 capable, url, 'capabilities',
846 capable, url, 'capabilities',
845 args)
847 args)
846
848
847 resp = sendrequest(ui, opener, req)
849 resp = sendrequest(ui, opener, req)
848
850
849 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
851 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
850 compressible=False,
852 compressible=False,
851 allowcbor=advertisev2)
853 allowcbor=advertisev2)
852
854
853 try:
855 try:
854 rawdata = resp.read()
856 rawdata = resp.read()
855 finally:
857 finally:
856 resp.close()
858 resp.close()
857
859
858 if not ct.startswith('application/mercurial-'):
860 if not ct.startswith('application/mercurial-'):
859 raise error.ProgrammingError('unexpected content-type: %s' % ct)
861 raise error.ProgrammingError('unexpected content-type: %s' % ct)
860
862
861 if advertisev2:
863 if advertisev2:
862 if ct == 'application/mercurial-cbor':
864 if ct == 'application/mercurial-cbor':
863 try:
865 try:
864 info = cbor.loads(rawdata)
866 info = cbor.loads(rawdata)
865 except cbor.CBORDecodeError:
867 except cbor.CBORDecodeError:
866 raise error.Abort(_('error decoding CBOR from remote server'),
868 raise error.Abort(_('error decoding CBOR from remote server'),
867 hint=_('try again and consider contacting '
869 hint=_('try again and consider contacting '
868 'the server operator'))
870 'the server operator'))
869
871
870 # We got a legacy response. That's fine.
872 # We got a legacy response. That's fine.
871 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
873 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
872 info = {
874 info = {
873 'v1capabilities': set(rawdata.split())
875 'v1capabilities': set(rawdata.split())
874 }
876 }
875
877
876 else:
878 else:
877 raise error.RepoError(
879 raise error.RepoError(
878 _('unexpected response type from server: %s') % ct)
880 _('unexpected response type from server: %s') % ct)
879 else:
881 else:
880 info = {
882 info = {
881 'v1capabilities': set(rawdata.split())
883 'v1capabilities': set(rawdata.split())
882 }
884 }
883
885
884 return respurl, info
886 return respurl, info
885
887
886 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
888 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
887 """Construct an appropriate HTTP peer instance.
889 """Construct an appropriate HTTP peer instance.
888
890
889 ``opener`` is an ``url.opener`` that should be used to establish
891 ``opener`` is an ``url.opener`` that should be used to establish
890 connections, perform HTTP requests.
892 connections, perform HTTP requests.
891
893
892 ``requestbuilder`` is the type used for constructing HTTP requests.
894 ``requestbuilder`` is the type used for constructing HTTP requests.
893 It exists as an argument so extensions can override the default.
895 It exists as an argument so extensions can override the default.
894 """
896 """
895 u = util.url(path)
897 u = util.url(path)
896 if u.query or u.fragment:
898 if u.query or u.fragment:
897 raise error.Abort(_('unsupported URL component: "%s"') %
899 raise error.Abort(_('unsupported URL component: "%s"') %
898 (u.query or u.fragment))
900 (u.query or u.fragment))
899
901
900 # urllib cannot handle URLs with embedded user or passwd.
902 # urllib cannot handle URLs with embedded user or passwd.
901 url, authinfo = u.authinfo()
903 url, authinfo = u.authinfo()
902 ui.debug('using %s\n' % url)
904 ui.debug('using %s\n' % url)
903
905
904 opener = opener or urlmod.opener(ui, authinfo)
906 opener = opener or urlmod.opener(ui, authinfo)
905
907
906 respurl, info = performhandshake(ui, url, opener, requestbuilder)
908 respurl, info = performhandshake(ui, url, opener, requestbuilder)
907
909
908 # Given the intersection of APIs that both we and the server support,
910 # Given the intersection of APIs that both we and the server support,
909 # sort by their advertised priority and pick the first one.
911 # sort by their advertised priority and pick the first one.
910 #
912 #
911 # TODO consider making this request-based and interface driven. For
913 # TODO consider making this request-based and interface driven. For
912 # example, the caller could say "I want a peer that does X." It's quite
914 # example, the caller could say "I want a peer that does X." It's quite
913 # possible that not all peers would do that. Since we know the service
915 # possible that not all peers would do that. Since we know the service
914 # capabilities, we could filter out services not meeting the
916 # capabilities, we could filter out services not meeting the
915 # requirements. Possibly by consulting the interfaces defined by the
917 # requirements. Possibly by consulting the interfaces defined by the
916 # peer type.
918 # peer type.
917 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
919 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
918
920
919 preferredchoices = sorted(apipeerchoices,
921 preferredchoices = sorted(apipeerchoices,
920 key=lambda x: API_PEERS[x]['priority'],
922 key=lambda x: API_PEERS[x]['priority'],
921 reverse=True)
923 reverse=True)
922
924
923 for service in preferredchoices:
925 for service in preferredchoices:
924 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
926 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
925
927
926 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
928 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
927 requestbuilder,
929 requestbuilder,
928 info['apis'][service])
930 info['apis'][service])
929
931
930 # Failed to construct an API peer. Fall back to legacy.
932 # Failed to construct an API peer. Fall back to legacy.
931 return httppeer(ui, path, respurl, opener, requestbuilder,
933 return httppeer(ui, path, respurl, opener, requestbuilder,
932 info['v1capabilities'])
934 info['v1capabilities'])
933
935
934 def instance(ui, path, create, intents=None):
936 def instance(ui, path, create, intents=None):
935 if create:
937 if create:
936 raise error.Abort(_('cannot create new http repository'))
938 raise error.Abort(_('cannot create new http repository'))
937 try:
939 try:
938 if path.startswith('https:') and not urlmod.has_https:
940 if path.startswith('https:') and not urlmod.has_https:
939 raise error.Abort(_('Python support for SSL and HTTPS '
941 raise error.Abort(_('Python support for SSL and HTTPS '
940 'is not installed'))
942 'is not installed'))
941
943
942 inst = makepeer(ui, path)
944 inst = makepeer(ui, path)
943
945
944 return inst
946 return inst
945 except error.RepoError as httpexception:
947 except error.RepoError as httpexception:
946 try:
948 try:
947 r = statichttprepo.instance(ui, "static-" + path, create)
949 r = statichttprepo.instance(ui, "static-" + path, create)
948 ui.note(_('(falling back to static-http)\n'))
950 ui.note(_('(falling back to static-http)\n'))
949 return r
951 return r
950 except error.RepoError:
952 except error.RepoError:
951 raise httpexception # use the original http RepoError instead
953 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now