##// END OF EJS Templates
httppeer: add TODO about delayed handling of ^C...
Gregory Szorc -
r39468:3c6f7eeb default
parent child Browse files
Show More
@@ -1,1001 +1,1003 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import io
12 import io
13 import os
13 import os
14 import socket
14 import socket
15 import struct
15 import struct
16 import weakref
16 import weakref
17
17
18 from .i18n import _
18 from .i18n import _
19 from .thirdparty import (
19 from .thirdparty import (
20 cbor,
20 cbor,
21 )
21 )
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 error,
24 error,
25 httpconnection,
25 httpconnection,
26 pycompat,
26 pycompat,
27 repository,
27 repository,
28 statichttprepo,
28 statichttprepo,
29 url as urlmod,
29 url as urlmod,
30 util,
30 util,
31 wireprotoframing,
31 wireprotoframing,
32 wireprototypes,
32 wireprototypes,
33 wireprotov1peer,
33 wireprotov1peer,
34 wireprotov2peer,
34 wireprotov2peer,
35 wireprotov2server,
35 wireprotov2server,
36 )
36 )
37 from .utils import (
37 from .utils import (
38 interfaceutil,
38 interfaceutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 httplib = util.httplib
42 httplib = util.httplib
43 urlerr = util.urlerr
43 urlerr = util.urlerr
44 urlreq = util.urlreq
44 urlreq = util.urlreq
45
45
46 def encodevalueinheaders(value, header, limit):
46 def encodevalueinheaders(value, header, limit):
47 """Encode a string value into multiple HTTP headers.
47 """Encode a string value into multiple HTTP headers.
48
48
49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 name + value will be at most ``limit`` bytes long.
51 name + value will be at most ``limit`` bytes long.
52
52
53 Returns an iterable of 2-tuples consisting of header names and
53 Returns an iterable of 2-tuples consisting of header names and
54 values as native strings.
54 values as native strings.
55 """
55 """
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 # not bytes. This function always takes bytes in as arguments.
57 # not bytes. This function always takes bytes in as arguments.
58 fmt = pycompat.strurl(header) + r'-%s'
58 fmt = pycompat.strurl(header) + r'-%s'
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 # and not a unicode: we're just getting the encoded length anyway,
60 # and not a unicode: we're just getting the encoded length anyway,
61 # and using an r-string to make it portable between Python 2 and 3
61 # and using an r-string to make it portable between Python 2 and 3
62 # doesn't work because then the \r is a literal backslash-r
62 # doesn't work because then the \r is a literal backslash-r
63 # instead of a carriage return.
63 # instead of a carriage return.
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 result = []
65 result = []
66
66
67 n = 0
67 n = 0
68 for i in pycompat.xrange(0, len(value), valuelen):
68 for i in pycompat.xrange(0, len(value), valuelen):
69 n += 1
69 n += 1
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71
71
72 return result
72 return result
73
73
74 def _wraphttpresponse(resp):
74 def _wraphttpresponse(resp):
75 """Wrap an HTTPResponse with common error handlers.
75 """Wrap an HTTPResponse with common error handlers.
76
76
77 This ensures that any I/O from any consumer raises the appropriate
77 This ensures that any I/O from any consumer raises the appropriate
78 error and messaging.
78 error and messaging.
79 """
79 """
80 origread = resp.read
80 origread = resp.read
81
81
82 class readerproxy(resp.__class__):
82 class readerproxy(resp.__class__):
83 def read(self, size=None):
83 def read(self, size=None):
84 try:
84 try:
85 return origread(size)
85 return origread(size)
86 except httplib.IncompleteRead as e:
86 except httplib.IncompleteRead as e:
87 # e.expected is an integer if length known or None otherwise.
87 # e.expected is an integer if length known or None otherwise.
88 if e.expected:
88 if e.expected:
89 msg = _('HTTP request error (incomplete response; '
89 msg = _('HTTP request error (incomplete response; '
90 'expected %d bytes got %d)') % (e.expected,
90 'expected %d bytes got %d)') % (e.expected,
91 len(e.partial))
91 len(e.partial))
92 else:
92 else:
93 msg = _('HTTP request error (incomplete response)')
93 msg = _('HTTP request error (incomplete response)')
94
94
95 raise error.PeerTransportError(
95 raise error.PeerTransportError(
96 msg,
96 msg,
97 hint=_('this may be an intermittent network failure; '
97 hint=_('this may be an intermittent network failure; '
98 'if the error persists, consider contacting the '
98 'if the error persists, consider contacting the '
99 'network or server operator'))
99 'network or server operator'))
100 except httplib.HTTPException as e:
100 except httplib.HTTPException as e:
101 raise error.PeerTransportError(
101 raise error.PeerTransportError(
102 _('HTTP request error (%s)') % e,
102 _('HTTP request error (%s)') % e,
103 hint=_('this may be an intermittent network failure; '
103 hint=_('this may be an intermittent network failure; '
104 'if the error persists, consider contacting the '
104 'if the error persists, consider contacting the '
105 'network or server operator'))
105 'network or server operator'))
106
106
107 resp.__class__ = readerproxy
107 resp.__class__ = readerproxy
108
108
109 class _multifile(object):
109 class _multifile(object):
110 def __init__(self, *fileobjs):
110 def __init__(self, *fileobjs):
111 for f in fileobjs:
111 for f in fileobjs:
112 if not util.safehasattr(f, 'length'):
112 if not util.safehasattr(f, 'length'):
113 raise ValueError(
113 raise ValueError(
114 '_multifile only supports file objects that '
114 '_multifile only supports file objects that '
115 'have a length but this one does not:', type(f), f)
115 'have a length but this one does not:', type(f), f)
116 self._fileobjs = fileobjs
116 self._fileobjs = fileobjs
117 self._index = 0
117 self._index = 0
118
118
119 @property
119 @property
120 def length(self):
120 def length(self):
121 return sum(f.length for f in self._fileobjs)
121 return sum(f.length for f in self._fileobjs)
122
122
123 def read(self, amt=None):
123 def read(self, amt=None):
124 if amt <= 0:
124 if amt <= 0:
125 return ''.join(f.read() for f in self._fileobjs)
125 return ''.join(f.read() for f in self._fileobjs)
126 parts = []
126 parts = []
127 while amt and self._index < len(self._fileobjs):
127 while amt and self._index < len(self._fileobjs):
128 parts.append(self._fileobjs[self._index].read(amt))
128 parts.append(self._fileobjs[self._index].read(amt))
129 got = len(parts[-1])
129 got = len(parts[-1])
130 if got < amt:
130 if got < amt:
131 self._index += 1
131 self._index += 1
132 amt -= got
132 amt -= got
133 return ''.join(parts)
133 return ''.join(parts)
134
134
135 def seek(self, offset, whence=os.SEEK_SET):
135 def seek(self, offset, whence=os.SEEK_SET):
136 if whence != os.SEEK_SET:
136 if whence != os.SEEK_SET:
137 raise NotImplementedError(
137 raise NotImplementedError(
138 '_multifile does not support anything other'
138 '_multifile does not support anything other'
139 ' than os.SEEK_SET for whence on seek()')
139 ' than os.SEEK_SET for whence on seek()')
140 if offset != 0:
140 if offset != 0:
141 raise NotImplementedError(
141 raise NotImplementedError(
142 '_multifile only supports seeking to start, but that '
142 '_multifile only supports seeking to start, but that '
143 'could be fixed if you need it')
143 'could be fixed if you need it')
144 for f in self._fileobjs:
144 for f in self._fileobjs:
145 f.seek(0)
145 f.seek(0)
146 self._index = 0
146 self._index = 0
147
147
148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 repobaseurl, cmd, args):
149 repobaseurl, cmd, args):
150 """Make an HTTP request to run a command for a version 1 client.
150 """Make an HTTP request to run a command for a version 1 client.
151
151
152 ``caps`` is a set of known server capabilities. The value may be
152 ``caps`` is a set of known server capabilities. The value may be
153 None if capabilities are not yet known.
153 None if capabilities are not yet known.
154
154
155 ``capablefn`` is a function to evaluate a capability.
155 ``capablefn`` is a function to evaluate a capability.
156
156
157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 raw data to pass to it.
158 raw data to pass to it.
159 """
159 """
160 if cmd == 'pushkey':
160 if cmd == 'pushkey':
161 args['data'] = ''
161 args['data'] = ''
162 data = args.pop('data', None)
162 data = args.pop('data', None)
163 headers = args.pop('headers', {})
163 headers = args.pop('headers', {})
164
164
165 ui.debug("sending %s command\n" % cmd)
165 ui.debug("sending %s command\n" % cmd)
166 q = [('cmd', cmd)]
166 q = [('cmd', cmd)]
167 headersize = 0
167 headersize = 0
168 # Important: don't use self.capable() here or else you end up
168 # Important: don't use self.capable() here or else you end up
169 # with infinite recursion when trying to look up capabilities
169 # with infinite recursion when trying to look up capabilities
170 # for the first time.
170 # for the first time.
171 postargsok = caps is not None and 'httppostargs' in caps
171 postargsok = caps is not None and 'httppostargs' in caps
172
172
173 # Send arguments via POST.
173 # Send arguments via POST.
174 if postargsok and args:
174 if postargsok and args:
175 strargs = urlreq.urlencode(sorted(args.items()))
175 strargs = urlreq.urlencode(sorted(args.items()))
176 if not data:
176 if not data:
177 data = strargs
177 data = strargs
178 else:
178 else:
179 if isinstance(data, bytes):
179 if isinstance(data, bytes):
180 i = io.BytesIO(data)
180 i = io.BytesIO(data)
181 i.length = len(data)
181 i.length = len(data)
182 data = i
182 data = i
183 argsio = io.BytesIO(strargs)
183 argsio = io.BytesIO(strargs)
184 argsio.length = len(strargs)
184 argsio.length = len(strargs)
185 data = _multifile(argsio, data)
185 data = _multifile(argsio, data)
186 headers[r'X-HgArgs-Post'] = len(strargs)
186 headers[r'X-HgArgs-Post'] = len(strargs)
187 elif args:
187 elif args:
188 # Calling self.capable() can infinite loop if we are calling
188 # Calling self.capable() can infinite loop if we are calling
189 # "capabilities". But that command should never accept wire
189 # "capabilities". But that command should never accept wire
190 # protocol arguments. So this should never happen.
190 # protocol arguments. So this should never happen.
191 assert cmd != 'capabilities'
191 assert cmd != 'capabilities'
192 httpheader = capablefn('httpheader')
192 httpheader = capablefn('httpheader')
193 if httpheader:
193 if httpheader:
194 headersize = int(httpheader.split(',', 1)[0])
194 headersize = int(httpheader.split(',', 1)[0])
195
195
196 # Send arguments via HTTP headers.
196 # Send arguments via HTTP headers.
197 if headersize > 0:
197 if headersize > 0:
198 # The headers can typically carry more data than the URL.
198 # The headers can typically carry more data than the URL.
199 encargs = urlreq.urlencode(sorted(args.items()))
199 encargs = urlreq.urlencode(sorted(args.items()))
200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 headersize):
201 headersize):
202 headers[header] = value
202 headers[header] = value
203 # Send arguments via query string (Mercurial <1.9).
203 # Send arguments via query string (Mercurial <1.9).
204 else:
204 else:
205 q += sorted(args.items())
205 q += sorted(args.items())
206
206
207 qs = '?%s' % urlreq.urlencode(q)
207 qs = '?%s' % urlreq.urlencode(q)
208 cu = "%s%s" % (repobaseurl, qs)
208 cu = "%s%s" % (repobaseurl, qs)
209 size = 0
209 size = 0
210 if util.safehasattr(data, 'length'):
210 if util.safehasattr(data, 'length'):
211 size = data.length
211 size = data.length
212 elif data is not None:
212 elif data is not None:
213 size = len(data)
213 size = len(data)
214 if data is not None and r'Content-Type' not in headers:
214 if data is not None and r'Content-Type' not in headers:
215 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216
216
217 # Tell the server we accept application/mercurial-0.2 and multiple
217 # Tell the server we accept application/mercurial-0.2 and multiple
218 # compression formats if the server is capable of emitting those
218 # compression formats if the server is capable of emitting those
219 # payloads.
219 # payloads.
220 # Note: Keep this set empty by default, as client advertisement of
220 # Note: Keep this set empty by default, as client advertisement of
221 # protocol parameters should only occur after the handshake.
221 # protocol parameters should only occur after the handshake.
222 protoparams = set()
222 protoparams = set()
223
223
224 mediatypes = set()
224 mediatypes = set()
225 if caps is not None:
225 if caps is not None:
226 mt = capablefn('httpmediatype')
226 mt = capablefn('httpmediatype')
227 if mt:
227 if mt:
228 protoparams.add('0.1')
228 protoparams.add('0.1')
229 mediatypes = set(mt.split(','))
229 mediatypes = set(mt.split(','))
230
230
231 protoparams.add('partial-pull')
231 protoparams.add('partial-pull')
232
232
233 if '0.2tx' in mediatypes:
233 if '0.2tx' in mediatypes:
234 protoparams.add('0.2')
234 protoparams.add('0.2')
235
235
236 if '0.2tx' in mediatypes and capablefn('compression'):
236 if '0.2tx' in mediatypes and capablefn('compression'):
237 # We /could/ compare supported compression formats and prune
237 # We /could/ compare supported compression formats and prune
238 # non-mutually supported or error if nothing is mutually supported.
238 # non-mutually supported or error if nothing is mutually supported.
239 # For now, send the full list to the server and have it error.
239 # For now, send the full list to the server and have it error.
240 comps = [e.wireprotosupport().name for e in
240 comps = [e.wireprotosupport().name for e in
241 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 protoparams.add('comp=%s' % ','.join(comps))
242 protoparams.add('comp=%s' % ','.join(comps))
243
243
244 if protoparams:
244 if protoparams:
245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 'X-HgProto',
246 'X-HgProto',
247 headersize or 1024)
247 headersize or 1024)
248 for header, value in protoheaders:
248 for header, value in protoheaders:
249 headers[header] = value
249 headers[header] = value
250
250
251 varyheaders = []
251 varyheaders = []
252 for header in headers:
252 for header in headers:
253 if header.lower().startswith(r'x-hg'):
253 if header.lower().startswith(r'x-hg'):
254 varyheaders.append(header)
254 varyheaders.append(header)
255
255
256 if varyheaders:
256 if varyheaders:
257 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258
258
259 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260
260
261 if data is not None:
261 if data is not None:
262 ui.debug("sending %d bytes\n" % size)
262 ui.debug("sending %d bytes\n" % size)
263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264
264
265 return req, cu, qs
265 return req, cu, qs
266
266
267 def _reqdata(req):
267 def _reqdata(req):
268 """Get request data, if any. If no data, returns None."""
268 """Get request data, if any. If no data, returns None."""
269 if pycompat.ispy3:
269 if pycompat.ispy3:
270 return req.data
270 return req.data
271 if not req.has_data():
271 if not req.has_data():
272 return None
272 return None
273 return req.get_data()
273 return req.get_data()
274
274
275 def sendrequest(ui, opener, req):
275 def sendrequest(ui, opener, req):
276 """Send a prepared HTTP request.
276 """Send a prepared HTTP request.
277
277
278 Returns the response object.
278 Returns the response object.
279 """
279 """
280 dbg = ui.debug
280 dbg = ui.debug
281 if (ui.debugflag
281 if (ui.debugflag
282 and ui.configbool('devel', 'debug.peer-request')):
282 and ui.configbool('devel', 'debug.peer-request')):
283 line = 'devel-peer-request: %s\n'
283 line = 'devel-peer-request: %s\n'
284 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
284 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
285 pycompat.bytesurl(req.get_full_url())))
285 pycompat.bytesurl(req.get_full_url())))
286 hgargssize = None
286 hgargssize = None
287
287
288 for header, value in sorted(req.header_items()):
288 for header, value in sorted(req.header_items()):
289 header = pycompat.bytesurl(header)
289 header = pycompat.bytesurl(header)
290 value = pycompat.bytesurl(value)
290 value = pycompat.bytesurl(value)
291 if header.startswith('X-hgarg-'):
291 if header.startswith('X-hgarg-'):
292 if hgargssize is None:
292 if hgargssize is None:
293 hgargssize = 0
293 hgargssize = 0
294 hgargssize += len(value)
294 hgargssize += len(value)
295 else:
295 else:
296 dbg(line % ' %s %s' % (header, value))
296 dbg(line % ' %s %s' % (header, value))
297
297
298 if hgargssize is not None:
298 if hgargssize is not None:
299 dbg(line % ' %d bytes of commands arguments in headers'
299 dbg(line % ' %d bytes of commands arguments in headers'
300 % hgargssize)
300 % hgargssize)
301 data = _reqdata(req)
301 data = _reqdata(req)
302 if data is not None:
302 if data is not None:
303 length = getattr(data, 'length', None)
303 length = getattr(data, 'length', None)
304 if length is None:
304 if length is None:
305 length = len(data)
305 length = len(data)
306 dbg(line % ' %d bytes of data' % length)
306 dbg(line % ' %d bytes of data' % length)
307
307
308 start = util.timer()
308 start = util.timer()
309
309
310 res = None
310 res = None
311 try:
311 try:
312 res = opener.open(req)
312 res = opener.open(req)
313 except urlerr.httperror as inst:
313 except urlerr.httperror as inst:
314 if inst.code == 401:
314 if inst.code == 401:
315 raise error.Abort(_('authorization failed'))
315 raise error.Abort(_('authorization failed'))
316 raise
316 raise
317 except httplib.HTTPException as inst:
317 except httplib.HTTPException as inst:
318 ui.debug('http error requesting %s\n' %
318 ui.debug('http error requesting %s\n' %
319 util.hidepassword(req.get_full_url()))
319 util.hidepassword(req.get_full_url()))
320 ui.traceback()
320 ui.traceback()
321 raise IOError(None, inst)
321 raise IOError(None, inst)
322 finally:
322 finally:
323 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
323 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
324 code = res.code if res else -1
324 code = res.code if res else -1
325 dbg(line % ' finished in %.4f seconds (%d)'
325 dbg(line % ' finished in %.4f seconds (%d)'
326 % (util.timer() - start, code))
326 % (util.timer() - start, code))
327
327
328 # Insert error handlers for common I/O failures.
328 # Insert error handlers for common I/O failures.
329 _wraphttpresponse(res)
329 _wraphttpresponse(res)
330
330
331 return res
331 return res
332
332
333 class RedirectedRepoError(error.RepoError):
333 class RedirectedRepoError(error.RepoError):
334 def __init__(self, msg, respurl):
334 def __init__(self, msg, respurl):
335 super(RedirectedRepoError, self).__init__(msg)
335 super(RedirectedRepoError, self).__init__(msg)
336 self.respurl = respurl
336 self.respurl = respurl
337
337
338 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
338 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
339 allowcbor=False):
339 allowcbor=False):
340 # record the url we got redirected to
340 # record the url we got redirected to
341 redirected = False
341 redirected = False
342 respurl = pycompat.bytesurl(resp.geturl())
342 respurl = pycompat.bytesurl(resp.geturl())
343 if respurl.endswith(qs):
343 if respurl.endswith(qs):
344 respurl = respurl[:-len(qs)]
344 respurl = respurl[:-len(qs)]
345 qsdropped = False
345 qsdropped = False
346 else:
346 else:
347 qsdropped = True
347 qsdropped = True
348
348
349 if baseurl.rstrip('/') != respurl.rstrip('/'):
349 if baseurl.rstrip('/') != respurl.rstrip('/'):
350 redirected = True
350 redirected = True
351 if not ui.quiet:
351 if not ui.quiet:
352 ui.warn(_('real URL is %s\n') % respurl)
352 ui.warn(_('real URL is %s\n') % respurl)
353
353
354 try:
354 try:
355 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
355 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
356 except AttributeError:
356 except AttributeError:
357 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
357 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
358
358
359 safeurl = util.hidepassword(baseurl)
359 safeurl = util.hidepassword(baseurl)
360 if proto.startswith('application/hg-error'):
360 if proto.startswith('application/hg-error'):
361 raise error.OutOfBandError(resp.read())
361 raise error.OutOfBandError(resp.read())
362
362
363 # Pre 1.0 versions of Mercurial used text/plain and
363 # Pre 1.0 versions of Mercurial used text/plain and
364 # application/hg-changegroup. We don't support such old servers.
364 # application/hg-changegroup. We don't support such old servers.
365 if not proto.startswith('application/mercurial-'):
365 if not proto.startswith('application/mercurial-'):
366 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
366 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
367 msg = _("'%s' does not appear to be an hg repository:\n"
367 msg = _("'%s' does not appear to be an hg repository:\n"
368 "---%%<--- (%s)\n%s\n---%%<---\n") % (
368 "---%%<--- (%s)\n%s\n---%%<---\n") % (
369 safeurl, proto or 'no content-type', resp.read(1024))
369 safeurl, proto or 'no content-type', resp.read(1024))
370
370
371 # Some servers may strip the query string from the redirect. We
371 # Some servers may strip the query string from the redirect. We
372 # raise a special error type so callers can react to this specially.
372 # raise a special error type so callers can react to this specially.
373 if redirected and qsdropped:
373 if redirected and qsdropped:
374 raise RedirectedRepoError(msg, respurl)
374 raise RedirectedRepoError(msg, respurl)
375 else:
375 else:
376 raise error.RepoError(msg)
376 raise error.RepoError(msg)
377
377
378 try:
378 try:
379 subtype = proto.split('-', 1)[1]
379 subtype = proto.split('-', 1)[1]
380
380
381 # Unless we end up supporting CBOR in the legacy wire protocol,
381 # Unless we end up supporting CBOR in the legacy wire protocol,
382 # this should ONLY be encountered for the initial capabilities
382 # this should ONLY be encountered for the initial capabilities
383 # request during handshake.
383 # request during handshake.
384 if subtype == 'cbor':
384 if subtype == 'cbor':
385 if allowcbor:
385 if allowcbor:
386 return respurl, proto, resp
386 return respurl, proto, resp
387 else:
387 else:
388 raise error.RepoError(_('unexpected CBOR response from '
388 raise error.RepoError(_('unexpected CBOR response from '
389 'server'))
389 'server'))
390
390
391 version_info = tuple([int(n) for n in subtype.split('.')])
391 version_info = tuple([int(n) for n in subtype.split('.')])
392 except ValueError:
392 except ValueError:
393 raise error.RepoError(_("'%s' sent a broken Content-Type "
393 raise error.RepoError(_("'%s' sent a broken Content-Type "
394 "header (%s)") % (safeurl, proto))
394 "header (%s)") % (safeurl, proto))
395
395
396 # TODO consider switching to a decompression reader that uses
396 # TODO consider switching to a decompression reader that uses
397 # generators.
397 # generators.
398 if version_info == (0, 1):
398 if version_info == (0, 1):
399 if compressible:
399 if compressible:
400 resp = util.compengines['zlib'].decompressorreader(resp)
400 resp = util.compengines['zlib'].decompressorreader(resp)
401
401
402 elif version_info == (0, 2):
402 elif version_info == (0, 2):
403 # application/mercurial-0.2 always identifies the compression
403 # application/mercurial-0.2 always identifies the compression
404 # engine in the payload header.
404 # engine in the payload header.
405 elen = struct.unpack('B', resp.read(1))[0]
405 elen = struct.unpack('B', resp.read(1))[0]
406 ename = resp.read(elen)
406 ename = resp.read(elen)
407 engine = util.compengines.forwiretype(ename)
407 engine = util.compengines.forwiretype(ename)
408
408
409 resp = engine.decompressorreader(resp)
409 resp = engine.decompressorreader(resp)
410 else:
410 else:
411 raise error.RepoError(_("'%s' uses newer protocol %s") %
411 raise error.RepoError(_("'%s' uses newer protocol %s") %
412 (safeurl, subtype))
412 (safeurl, subtype))
413
413
414 return respurl, proto, resp
414 return respurl, proto, resp
415
415
416 class httppeer(wireprotov1peer.wirepeer):
416 class httppeer(wireprotov1peer.wirepeer):
417 def __init__(self, ui, path, url, opener, requestbuilder, caps):
417 def __init__(self, ui, path, url, opener, requestbuilder, caps):
418 self.ui = ui
418 self.ui = ui
419 self._path = path
419 self._path = path
420 self._url = url
420 self._url = url
421 self._caps = caps
421 self._caps = caps
422 self._urlopener = opener
422 self._urlopener = opener
423 self._requestbuilder = requestbuilder
423 self._requestbuilder = requestbuilder
424
424
425 def __del__(self):
425 def __del__(self):
426 for h in self._urlopener.handlers:
426 for h in self._urlopener.handlers:
427 h.close()
427 h.close()
428 getattr(h, "close_all", lambda: None)()
428 getattr(h, "close_all", lambda: None)()
429
429
430 # Begin of ipeerconnection interface.
430 # Begin of ipeerconnection interface.
431
431
432 def url(self):
432 def url(self):
433 return self._path
433 return self._path
434
434
435 def local(self):
435 def local(self):
436 return None
436 return None
437
437
438 def peer(self):
438 def peer(self):
439 return self
439 return self
440
440
441 def canpush(self):
441 def canpush(self):
442 return True
442 return True
443
443
444 def close(self):
444 def close(self):
445 pass
445 pass
446
446
447 # End of ipeerconnection interface.
447 # End of ipeerconnection interface.
448
448
449 # Begin of ipeercommands interface.
449 # Begin of ipeercommands interface.
450
450
451 def capabilities(self):
451 def capabilities(self):
452 return self._caps
452 return self._caps
453
453
454 # End of ipeercommands interface.
454 # End of ipeercommands interface.
455
455
456 def _callstream(self, cmd, _compressible=False, **args):
456 def _callstream(self, cmd, _compressible=False, **args):
457 args = pycompat.byteskwargs(args)
457 args = pycompat.byteskwargs(args)
458
458
459 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
459 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
460 self._caps, self.capable,
460 self._caps, self.capable,
461 self._url, cmd, args)
461 self._url, cmd, args)
462
462
463 resp = sendrequest(self.ui, self._urlopener, req)
463 resp = sendrequest(self.ui, self._urlopener, req)
464
464
465 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
465 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
466 resp, _compressible)
466 resp, _compressible)
467
467
468 return resp
468 return resp
469
469
470 def _call(self, cmd, **args):
470 def _call(self, cmd, **args):
471 fp = self._callstream(cmd, **args)
471 fp = self._callstream(cmd, **args)
472 try:
472 try:
473 return fp.read()
473 return fp.read()
474 finally:
474 finally:
475 # if using keepalive, allow connection to be reused
475 # if using keepalive, allow connection to be reused
476 fp.close()
476 fp.close()
477
477
478 def _callpush(self, cmd, cg, **args):
478 def _callpush(self, cmd, cg, **args):
479 # have to stream bundle to a temp file because we do not have
479 # have to stream bundle to a temp file because we do not have
480 # http 1.1 chunked transfer.
480 # http 1.1 chunked transfer.
481
481
482 types = self.capable('unbundle')
482 types = self.capable('unbundle')
483 try:
483 try:
484 types = types.split(',')
484 types = types.split(',')
485 except AttributeError:
485 except AttributeError:
486 # servers older than d1b16a746db6 will send 'unbundle' as a
486 # servers older than d1b16a746db6 will send 'unbundle' as a
487 # boolean capability. They only support headerless/uncompressed
487 # boolean capability. They only support headerless/uncompressed
488 # bundles.
488 # bundles.
489 types = [""]
489 types = [""]
490 for x in types:
490 for x in types:
491 if x in bundle2.bundletypes:
491 if x in bundle2.bundletypes:
492 type = x
492 type = x
493 break
493 break
494
494
495 tempname = bundle2.writebundle(self.ui, cg, None, type)
495 tempname = bundle2.writebundle(self.ui, cg, None, type)
496 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
496 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
497 headers = {r'Content-Type': r'application/mercurial-0.1'}
497 headers = {r'Content-Type': r'application/mercurial-0.1'}
498
498
499 try:
499 try:
500 r = self._call(cmd, data=fp, headers=headers, **args)
500 r = self._call(cmd, data=fp, headers=headers, **args)
501 vals = r.split('\n', 1)
501 vals = r.split('\n', 1)
502 if len(vals) < 2:
502 if len(vals) < 2:
503 raise error.ResponseError(_("unexpected response:"), r)
503 raise error.ResponseError(_("unexpected response:"), r)
504 return vals
504 return vals
505 except urlerr.httperror:
505 except urlerr.httperror:
506 # Catch and re-raise these so we don't try and treat them
506 # Catch and re-raise these so we don't try and treat them
507 # like generic socket errors. They lack any values in
507 # like generic socket errors. They lack any values in
508 # .args on Python 3 which breaks our socket.error block.
508 # .args on Python 3 which breaks our socket.error block.
509 raise
509 raise
510 except socket.error as err:
510 except socket.error as err:
511 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
511 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
512 raise error.Abort(_('push failed: %s') % err.args[1])
512 raise error.Abort(_('push failed: %s') % err.args[1])
513 raise error.Abort(err.args[1])
513 raise error.Abort(err.args[1])
514 finally:
514 finally:
515 fp.close()
515 fp.close()
516 os.unlink(tempname)
516 os.unlink(tempname)
517
517
518 def _calltwowaystream(self, cmd, fp, **args):
518 def _calltwowaystream(self, cmd, fp, **args):
519 fh = None
519 fh = None
520 fp_ = None
520 fp_ = None
521 filename = None
521 filename = None
522 try:
522 try:
523 # dump bundle to disk
523 # dump bundle to disk
524 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
524 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
525 fh = os.fdopen(fd, r"wb")
525 fh = os.fdopen(fd, r"wb")
526 d = fp.read(4096)
526 d = fp.read(4096)
527 while d:
527 while d:
528 fh.write(d)
528 fh.write(d)
529 d = fp.read(4096)
529 d = fp.read(4096)
530 fh.close()
530 fh.close()
531 # start http push
531 # start http push
532 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
532 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
533 headers = {r'Content-Type': r'application/mercurial-0.1'}
533 headers = {r'Content-Type': r'application/mercurial-0.1'}
534 return self._callstream(cmd, data=fp_, headers=headers, **args)
534 return self._callstream(cmd, data=fp_, headers=headers, **args)
535 finally:
535 finally:
536 if fp_ is not None:
536 if fp_ is not None:
537 fp_.close()
537 fp_.close()
538 if fh is not None:
538 if fh is not None:
539 fh.close()
539 fh.close()
540 os.unlink(filename)
540 os.unlink(filename)
541
541
542 def _callcompressable(self, cmd, **args):
542 def _callcompressable(self, cmd, **args):
543 return self._callstream(cmd, _compressible=True, **args)
543 return self._callstream(cmd, _compressible=True, **args)
544
544
545 def _abort(self, exception):
545 def _abort(self, exception):
546 raise exception
546 raise exception
547
547
548 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
548 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
549 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
549 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
550 buffersends=True)
550 buffersends=True)
551
551
552 handler = wireprotov2peer.clienthandler(ui, reactor)
552 handler = wireprotov2peer.clienthandler(ui, reactor)
553
553
554 url = '%s/%s' % (apiurl, permission)
554 url = '%s/%s' % (apiurl, permission)
555
555
556 if len(requests) > 1:
556 if len(requests) > 1:
557 url += '/multirequest'
557 url += '/multirequest'
558 else:
558 else:
559 url += '/%s' % requests[0][0]
559 url += '/%s' % requests[0][0]
560
560
561 ui.debug('sending %d commands\n' % len(requests))
561 ui.debug('sending %d commands\n' % len(requests))
562 for command, args, f in requests:
562 for command, args, f in requests:
563 ui.debug('sending command %s: %s\n' % (
563 ui.debug('sending command %s: %s\n' % (
564 command, stringutil.pprint(args, indent=2)))
564 command, stringutil.pprint(args, indent=2)))
565 assert not list(handler.callcommand(command, args, f))
565 assert not list(handler.callcommand(command, args, f))
566
566
567 # TODO stream this.
567 # TODO stream this.
568 body = b''.join(map(bytes, handler.flushcommands()))
568 body = b''.join(map(bytes, handler.flushcommands()))
569
569
570 # TODO modify user-agent to reflect v2
570 # TODO modify user-agent to reflect v2
571 headers = {
571 headers = {
572 r'Accept': wireprotov2server.FRAMINGTYPE,
572 r'Accept': wireprotov2server.FRAMINGTYPE,
573 r'Content-Type': wireprotov2server.FRAMINGTYPE,
573 r'Content-Type': wireprotov2server.FRAMINGTYPE,
574 }
574 }
575
575
576 req = requestbuilder(pycompat.strurl(url), body, headers)
576 req = requestbuilder(pycompat.strurl(url), body, headers)
577 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
577 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
578
578
579 try:
579 try:
580 res = opener.open(req)
580 res = opener.open(req)
581 except urlerr.httperror as e:
581 except urlerr.httperror as e:
582 if e.code == 401:
582 if e.code == 401:
583 raise error.Abort(_('authorization failed'))
583 raise error.Abort(_('authorization failed'))
584
584
585 raise
585 raise
586 except httplib.HTTPException as e:
586 except httplib.HTTPException as e:
587 ui.traceback()
587 ui.traceback()
588 raise IOError(None, e)
588 raise IOError(None, e)
589
589
590 return handler, res
590 return handler, res
591
591
592 class queuedcommandfuture(pycompat.futures.Future):
592 class queuedcommandfuture(pycompat.futures.Future):
593 """Wraps result() on command futures to trigger submission on call."""
593 """Wraps result() on command futures to trigger submission on call."""
594
594
595 def result(self, timeout=None):
595 def result(self, timeout=None):
596 if self.done():
596 if self.done():
597 return pycompat.futures.Future.result(self, timeout)
597 return pycompat.futures.Future.result(self, timeout)
598
598
599 self._peerexecutor.sendcommands()
599 self._peerexecutor.sendcommands()
600
600
601 # sendcommands() will restore the original __class__ and self.result
601 # sendcommands() will restore the original __class__ and self.result
602 # will resolve to Future.result.
602 # will resolve to Future.result.
603 return self.result(timeout)
603 return self.result(timeout)
604
604
605 @interfaceutil.implementer(repository.ipeercommandexecutor)
605 @interfaceutil.implementer(repository.ipeercommandexecutor)
606 class httpv2executor(object):
606 class httpv2executor(object):
607 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
607 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
608 self._ui = ui
608 self._ui = ui
609 self._opener = opener
609 self._opener = opener
610 self._requestbuilder = requestbuilder
610 self._requestbuilder = requestbuilder
611 self._apiurl = apiurl
611 self._apiurl = apiurl
612 self._descriptor = descriptor
612 self._descriptor = descriptor
613 self._sent = False
613 self._sent = False
614 self._closed = False
614 self._closed = False
615 self._neededpermissions = set()
615 self._neededpermissions = set()
616 self._calls = []
616 self._calls = []
617 self._futures = weakref.WeakSet()
617 self._futures = weakref.WeakSet()
618 self._responseexecutor = None
618 self._responseexecutor = None
619 self._responsef = None
619 self._responsef = None
620
620
621 def __enter__(self):
621 def __enter__(self):
622 return self
622 return self
623
623
624 def __exit__(self, exctype, excvalue, exctb):
624 def __exit__(self, exctype, excvalue, exctb):
625 self.close()
625 self.close()
626
626
627 def callcommand(self, command, args):
627 def callcommand(self, command, args):
628 if self._sent:
628 if self._sent:
629 raise error.ProgrammingError('callcommand() cannot be used after '
629 raise error.ProgrammingError('callcommand() cannot be used after '
630 'commands are sent')
630 'commands are sent')
631
631
632 if self._closed:
632 if self._closed:
633 raise error.ProgrammingError('callcommand() cannot be used after '
633 raise error.ProgrammingError('callcommand() cannot be used after '
634 'close()')
634 'close()')
635
635
636 # The service advertises which commands are available. So if we attempt
636 # The service advertises which commands are available. So if we attempt
637 # to call an unknown command or pass an unknown argument, we can screen
637 # to call an unknown command or pass an unknown argument, we can screen
638 # for this.
638 # for this.
639 if command not in self._descriptor['commands']:
639 if command not in self._descriptor['commands']:
640 raise error.ProgrammingError(
640 raise error.ProgrammingError(
641 'wire protocol command %s is not available' % command)
641 'wire protocol command %s is not available' % command)
642
642
643 cmdinfo = self._descriptor['commands'][command]
643 cmdinfo = self._descriptor['commands'][command]
644 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
644 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
645
645
646 if unknownargs:
646 if unknownargs:
647 raise error.ProgrammingError(
647 raise error.ProgrammingError(
648 'wire protocol command %s does not accept argument: %s' % (
648 'wire protocol command %s does not accept argument: %s' % (
649 command, ', '.join(sorted(unknownargs))))
649 command, ', '.join(sorted(unknownargs))))
650
650
651 self._neededpermissions |= set(cmdinfo['permissions'])
651 self._neededpermissions |= set(cmdinfo['permissions'])
652
652
653 # TODO we /could/ also validate types here, since the API descriptor
653 # TODO we /could/ also validate types here, since the API descriptor
654 # includes types...
654 # includes types...
655
655
656 f = pycompat.futures.Future()
656 f = pycompat.futures.Future()
657
657
658 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
658 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
659 # could deadlock.
659 # could deadlock.
660 f.__class__ = queuedcommandfuture
660 f.__class__ = queuedcommandfuture
661 f._peerexecutor = self
661 f._peerexecutor = self
662
662
663 self._futures.add(f)
663 self._futures.add(f)
664 self._calls.append((command, args, f))
664 self._calls.append((command, args, f))
665
665
666 return f
666 return f
667
667
668 def sendcommands(self):
668 def sendcommands(self):
669 if self._sent:
669 if self._sent:
670 return
670 return
671
671
672 if not self._calls:
672 if not self._calls:
673 return
673 return
674
674
675 self._sent = True
675 self._sent = True
676
676
677 # Unhack any future types so caller sees a clean type and so we
677 # Unhack any future types so caller sees a clean type and so we
678 # break reference cycle.
678 # break reference cycle.
679 for f in self._futures:
679 for f in self._futures:
680 if isinstance(f, queuedcommandfuture):
680 if isinstance(f, queuedcommandfuture):
681 f.__class__ = pycompat.futures.Future
681 f.__class__ = pycompat.futures.Future
682 f._peerexecutor = None
682 f._peerexecutor = None
683
683
684 # Mark the future as running and filter out cancelled futures.
684 # Mark the future as running and filter out cancelled futures.
685 calls = [(command, args, f)
685 calls = [(command, args, f)
686 for command, args, f in self._calls
686 for command, args, f in self._calls
687 if f.set_running_or_notify_cancel()]
687 if f.set_running_or_notify_cancel()]
688
688
689 # Clear out references, prevent improper object usage.
689 # Clear out references, prevent improper object usage.
690 self._calls = None
690 self._calls = None
691
691
692 if not calls:
692 if not calls:
693 return
693 return
694
694
695 permissions = set(self._neededpermissions)
695 permissions = set(self._neededpermissions)
696
696
697 if 'push' in permissions and 'pull' in permissions:
697 if 'push' in permissions and 'pull' in permissions:
698 permissions.remove('pull')
698 permissions.remove('pull')
699
699
700 if len(permissions) > 1:
700 if len(permissions) > 1:
701 raise error.RepoError(_('cannot make request requiring multiple '
701 raise error.RepoError(_('cannot make request requiring multiple '
702 'permissions: %s') %
702 'permissions: %s') %
703 _(', ').join(sorted(permissions)))
703 _(', ').join(sorted(permissions)))
704
704
705 permission = {
705 permission = {
706 'push': 'rw',
706 'push': 'rw',
707 'pull': 'ro',
707 'pull': 'ro',
708 }[permissions.pop()]
708 }[permissions.pop()]
709
709
710 handler, resp = sendv2request(
710 handler, resp = sendv2request(
711 self._ui, self._opener, self._requestbuilder, self._apiurl,
711 self._ui, self._opener, self._requestbuilder, self._apiurl,
712 permission, calls)
712 permission, calls)
713
713
714 # TODO we probably want to validate the HTTP code, media type, etc.
714 # TODO we probably want to validate the HTTP code, media type, etc.
715
715
716 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
716 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
717 self._responsef = self._responseexecutor.submit(self._handleresponse,
717 self._responsef = self._responseexecutor.submit(self._handleresponse,
718 handler, resp)
718 handler, resp)
719
719
720 def close(self):
720 def close(self):
721 if self._closed:
721 if self._closed:
722 return
722 return
723
723
724 self.sendcommands()
724 self.sendcommands()
725
725
726 self._closed = True
726 self._closed = True
727
727
728 if not self._responsef:
728 if not self._responsef:
729 return
729 return
730
730
731 # TODO ^C here may not result in immediate program termination.
732
731 try:
733 try:
732 self._responsef.result()
734 self._responsef.result()
733 finally:
735 finally:
734 self._responseexecutor.shutdown(wait=True)
736 self._responseexecutor.shutdown(wait=True)
735 self._responsef = None
737 self._responsef = None
736 self._responseexecutor = None
738 self._responseexecutor = None
737
739
738 # If any of our futures are still in progress, mark them as
740 # If any of our futures are still in progress, mark them as
739 # errored, otherwise a result() could wait indefinitely.
741 # errored, otherwise a result() could wait indefinitely.
740 for f in self._futures:
742 for f in self._futures:
741 if not f.done():
743 if not f.done():
742 f.set_exception(error.ResponseError(
744 f.set_exception(error.ResponseError(
743 _('unfulfilled command response')))
745 _('unfulfilled command response')))
744
746
745 self._futures = None
747 self._futures = None
746
748
747 def _handleresponse(self, handler, resp):
749 def _handleresponse(self, handler, resp):
748 # Called in a thread to read the response.
750 # Called in a thread to read the response.
749
751
750 while handler.readframe(resp):
752 while handler.readframe(resp):
751 pass
753 pass
752
754
753 # TODO implement interface for version 2 peers
755 # TODO implement interface for version 2 peers
754 @interfaceutil.implementer(repository.ipeerconnection,
756 @interfaceutil.implementer(repository.ipeerconnection,
755 repository.ipeercapabilities,
757 repository.ipeercapabilities,
756 repository.ipeerrequests)
758 repository.ipeerrequests)
757 class httpv2peer(object):
759 class httpv2peer(object):
758 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
760 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
759 apidescriptor):
761 apidescriptor):
760 self.ui = ui
762 self.ui = ui
761
763
762 if repourl.endswith('/'):
764 if repourl.endswith('/'):
763 repourl = repourl[:-1]
765 repourl = repourl[:-1]
764
766
765 self._url = repourl
767 self._url = repourl
766 self._apipath = apipath
768 self._apipath = apipath
767 self._apiurl = '%s/%s' % (repourl, apipath)
769 self._apiurl = '%s/%s' % (repourl, apipath)
768 self._opener = opener
770 self._opener = opener
769 self._requestbuilder = requestbuilder
771 self._requestbuilder = requestbuilder
770 self._descriptor = apidescriptor
772 self._descriptor = apidescriptor
771
773
772 # Start of ipeerconnection.
774 # Start of ipeerconnection.
773
775
774 def url(self):
776 def url(self):
775 return self._url
777 return self._url
776
778
777 def local(self):
779 def local(self):
778 return None
780 return None
779
781
780 def peer(self):
782 def peer(self):
781 return self
783 return self
782
784
783 def canpush(self):
785 def canpush(self):
784 # TODO change once implemented.
786 # TODO change once implemented.
785 return False
787 return False
786
788
787 def close(self):
789 def close(self):
788 pass
790 pass
789
791
790 # End of ipeerconnection.
792 # End of ipeerconnection.
791
793
792 # Start of ipeercapabilities.
794 # Start of ipeercapabilities.
793
795
794 def capable(self, name):
796 def capable(self, name):
795 # The capabilities used internally historically map to capabilities
797 # The capabilities used internally historically map to capabilities
796 # advertised from the "capabilities" wire protocol command. However,
798 # advertised from the "capabilities" wire protocol command. However,
797 # version 2 of that command works differently.
799 # version 2 of that command works differently.
798
800
799 # Maps to commands that are available.
801 # Maps to commands that are available.
800 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
802 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
801 return True
803 return True
802
804
803 # Other concepts.
805 # Other concepts.
804 if name in ('bundle2',):
806 if name in ('bundle2',):
805 return True
807 return True
806
808
807 return False
809 return False
808
810
809 def requirecap(self, name, purpose):
811 def requirecap(self, name, purpose):
810 if self.capable(name):
812 if self.capable(name):
811 return
813 return
812
814
813 raise error.CapabilityError(
815 raise error.CapabilityError(
814 _('cannot %s; client or remote repository does not support the %r '
816 _('cannot %s; client or remote repository does not support the %r '
815 'capability') % (purpose, name))
817 'capability') % (purpose, name))
816
818
817 # End of ipeercapabilities.
819 # End of ipeercapabilities.
818
820
819 def _call(self, name, **args):
821 def _call(self, name, **args):
820 with self.commandexecutor() as e:
822 with self.commandexecutor() as e:
821 return e.callcommand(name, args).result()
823 return e.callcommand(name, args).result()
822
824
823 def commandexecutor(self):
825 def commandexecutor(self):
824 return httpv2executor(self.ui, self._opener, self._requestbuilder,
826 return httpv2executor(self.ui, self._opener, self._requestbuilder,
825 self._apiurl, self._descriptor)
827 self._apiurl, self._descriptor)
826
828
827 # Registry of API service names to metadata about peers that handle it.
829 # Registry of API service names to metadata about peers that handle it.
828 #
830 #
829 # The following keys are meaningful:
831 # The following keys are meaningful:
830 #
832 #
831 # init
833 # init
832 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
834 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
833 # apidescriptor) to create a peer.
835 # apidescriptor) to create a peer.
834 #
836 #
835 # priority
837 # priority
836 # Integer priority for the service. If we could choose from multiple
838 # Integer priority for the service. If we could choose from multiple
837 # services, we choose the one with the highest priority.
839 # services, we choose the one with the highest priority.
838 API_PEERS = {
840 API_PEERS = {
839 wireprototypes.HTTP_WIREPROTO_V2: {
841 wireprototypes.HTTP_WIREPROTO_V2: {
840 'init': httpv2peer,
842 'init': httpv2peer,
841 'priority': 50,
843 'priority': 50,
842 },
844 },
843 }
845 }
844
846
845 def performhandshake(ui, url, opener, requestbuilder):
847 def performhandshake(ui, url, opener, requestbuilder):
846 # The handshake is a request to the capabilities command.
848 # The handshake is a request to the capabilities command.
847
849
848 caps = None
850 caps = None
849 def capable(x):
851 def capable(x):
850 raise error.ProgrammingError('should not be called')
852 raise error.ProgrammingError('should not be called')
851
853
852 args = {}
854 args = {}
853
855
854 # The client advertises support for newer protocols by adding an
856 # The client advertises support for newer protocols by adding an
855 # X-HgUpgrade-* header with a list of supported APIs and an
857 # X-HgUpgrade-* header with a list of supported APIs and an
856 # X-HgProto-* header advertising which serializing formats it supports.
858 # X-HgProto-* header advertising which serializing formats it supports.
857 # We only support the HTTP version 2 transport and CBOR responses for
859 # We only support the HTTP version 2 transport and CBOR responses for
858 # now.
860 # now.
859 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
861 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
860
862
861 if advertisev2:
863 if advertisev2:
862 args['headers'] = {
864 args['headers'] = {
863 r'X-HgProto-1': r'cbor',
865 r'X-HgProto-1': r'cbor',
864 }
866 }
865
867
866 args['headers'].update(
868 args['headers'].update(
867 encodevalueinheaders(' '.join(sorted(API_PEERS)),
869 encodevalueinheaders(' '.join(sorted(API_PEERS)),
868 'X-HgUpgrade',
870 'X-HgUpgrade',
869 # We don't know the header limit this early.
871 # We don't know the header limit this early.
870 # So make it small.
872 # So make it small.
871 1024))
873 1024))
872
874
873 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
875 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
874 capable, url, 'capabilities',
876 capable, url, 'capabilities',
875 args)
877 args)
876 resp = sendrequest(ui, opener, req)
878 resp = sendrequest(ui, opener, req)
877
879
878 # The server may redirect us to the repo root, stripping the
880 # The server may redirect us to the repo root, stripping the
879 # ?cmd=capabilities query string from the URL. The server would likely
881 # ?cmd=capabilities query string from the URL. The server would likely
880 # return HTML in this case and ``parsev1commandresponse()`` would raise.
882 # return HTML in this case and ``parsev1commandresponse()`` would raise.
881 # We catch this special case and re-issue the capabilities request against
883 # We catch this special case and re-issue the capabilities request against
882 # the new URL.
884 # the new URL.
883 #
885 #
884 # We should ideally not do this, as a redirect that drops the query
886 # We should ideally not do this, as a redirect that drops the query
885 # string from the URL is arguably a server bug. (Garbage in, garbage out).
887 # string from the URL is arguably a server bug. (Garbage in, garbage out).
886 # However, Mercurial clients for several years appeared to handle this
888 # However, Mercurial clients for several years appeared to handle this
887 # issue without behavior degradation. And according to issue 5860, it may
889 # issue without behavior degradation. And according to issue 5860, it may
888 # be a longstanding bug in some server implementations. So we allow a
890 # be a longstanding bug in some server implementations. So we allow a
889 # redirect that drops the query string to "just work."
891 # redirect that drops the query string to "just work."
890 try:
892 try:
891 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
893 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
892 compressible=False,
894 compressible=False,
893 allowcbor=advertisev2)
895 allowcbor=advertisev2)
894 except RedirectedRepoError as e:
896 except RedirectedRepoError as e:
895 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
897 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
896 capable, e.respurl,
898 capable, e.respurl,
897 'capabilities', args)
899 'capabilities', args)
898 resp = sendrequest(ui, opener, req)
900 resp = sendrequest(ui, opener, req)
899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
901 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
900 compressible=False,
902 compressible=False,
901 allowcbor=advertisev2)
903 allowcbor=advertisev2)
902
904
903 try:
905 try:
904 rawdata = resp.read()
906 rawdata = resp.read()
905 finally:
907 finally:
906 resp.close()
908 resp.close()
907
909
908 if not ct.startswith('application/mercurial-'):
910 if not ct.startswith('application/mercurial-'):
909 raise error.ProgrammingError('unexpected content-type: %s' % ct)
911 raise error.ProgrammingError('unexpected content-type: %s' % ct)
910
912
911 if advertisev2:
913 if advertisev2:
912 if ct == 'application/mercurial-cbor':
914 if ct == 'application/mercurial-cbor':
913 try:
915 try:
914 info = cbor.loads(rawdata)
916 info = cbor.loads(rawdata)
915 except cbor.CBORDecodeError:
917 except cbor.CBORDecodeError:
916 raise error.Abort(_('error decoding CBOR from remote server'),
918 raise error.Abort(_('error decoding CBOR from remote server'),
917 hint=_('try again and consider contacting '
919 hint=_('try again and consider contacting '
918 'the server operator'))
920 'the server operator'))
919
921
920 # We got a legacy response. That's fine.
922 # We got a legacy response. That's fine.
921 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
923 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
922 info = {
924 info = {
923 'v1capabilities': set(rawdata.split())
925 'v1capabilities': set(rawdata.split())
924 }
926 }
925
927
926 else:
928 else:
927 raise error.RepoError(
929 raise error.RepoError(
928 _('unexpected response type from server: %s') % ct)
930 _('unexpected response type from server: %s') % ct)
929 else:
931 else:
930 info = {
932 info = {
931 'v1capabilities': set(rawdata.split())
933 'v1capabilities': set(rawdata.split())
932 }
934 }
933
935
934 return respurl, info
936 return respurl, info
935
937
936 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
938 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
937 """Construct an appropriate HTTP peer instance.
939 """Construct an appropriate HTTP peer instance.
938
940
939 ``opener`` is an ``url.opener`` that should be used to establish
941 ``opener`` is an ``url.opener`` that should be used to establish
940 connections, perform HTTP requests.
942 connections, perform HTTP requests.
941
943
942 ``requestbuilder`` is the type used for constructing HTTP requests.
944 ``requestbuilder`` is the type used for constructing HTTP requests.
943 It exists as an argument so extensions can override the default.
945 It exists as an argument so extensions can override the default.
944 """
946 """
945 u = util.url(path)
947 u = util.url(path)
946 if u.query or u.fragment:
948 if u.query or u.fragment:
947 raise error.Abort(_('unsupported URL component: "%s"') %
949 raise error.Abort(_('unsupported URL component: "%s"') %
948 (u.query or u.fragment))
950 (u.query or u.fragment))
949
951
950 # urllib cannot handle URLs with embedded user or passwd.
952 # urllib cannot handle URLs with embedded user or passwd.
951 url, authinfo = u.authinfo()
953 url, authinfo = u.authinfo()
952 ui.debug('using %s\n' % url)
954 ui.debug('using %s\n' % url)
953
955
954 opener = opener or urlmod.opener(ui, authinfo)
956 opener = opener or urlmod.opener(ui, authinfo)
955
957
956 respurl, info = performhandshake(ui, url, opener, requestbuilder)
958 respurl, info = performhandshake(ui, url, opener, requestbuilder)
957
959
958 # Given the intersection of APIs that both we and the server support,
960 # Given the intersection of APIs that both we and the server support,
959 # sort by their advertised priority and pick the first one.
961 # sort by their advertised priority and pick the first one.
960 #
962 #
961 # TODO consider making this request-based and interface driven. For
963 # TODO consider making this request-based and interface driven. For
962 # example, the caller could say "I want a peer that does X." It's quite
964 # example, the caller could say "I want a peer that does X." It's quite
963 # possible that not all peers would do that. Since we know the service
965 # possible that not all peers would do that. Since we know the service
964 # capabilities, we could filter out services not meeting the
966 # capabilities, we could filter out services not meeting the
965 # requirements. Possibly by consulting the interfaces defined by the
967 # requirements. Possibly by consulting the interfaces defined by the
966 # peer type.
968 # peer type.
967 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
969 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
968
970
969 preferredchoices = sorted(apipeerchoices,
971 preferredchoices = sorted(apipeerchoices,
970 key=lambda x: API_PEERS[x]['priority'],
972 key=lambda x: API_PEERS[x]['priority'],
971 reverse=True)
973 reverse=True)
972
974
973 for service in preferredchoices:
975 for service in preferredchoices:
974 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
976 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
975
977
976 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
978 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
977 requestbuilder,
979 requestbuilder,
978 info['apis'][service])
980 info['apis'][service])
979
981
980 # Failed to construct an API peer. Fall back to legacy.
982 # Failed to construct an API peer. Fall back to legacy.
981 return httppeer(ui, path, respurl, opener, requestbuilder,
983 return httppeer(ui, path, respurl, opener, requestbuilder,
982 info['v1capabilities'])
984 info['v1capabilities'])
983
985
984 def instance(ui, path, create, intents=None):
986 def instance(ui, path, create, intents=None):
985 if create:
987 if create:
986 raise error.Abort(_('cannot create new http repository'))
988 raise error.Abort(_('cannot create new http repository'))
987 try:
989 try:
988 if path.startswith('https:') and not urlmod.has_https:
990 if path.startswith('https:') and not urlmod.has_https:
989 raise error.Abort(_('Python support for SSL and HTTPS '
991 raise error.Abort(_('Python support for SSL and HTTPS '
990 'is not installed'))
992 'is not installed'))
991
993
992 inst = makepeer(ui, path)
994 inst = makepeer(ui, path)
993
995
994 return inst
996 return inst
995 except error.RepoError as httpexception:
997 except error.RepoError as httpexception:
996 try:
998 try:
997 r = statichttprepo.instance(ui, "static-" + path, create)
999 r = statichttprepo.instance(ui, "static-" + path, create)
998 ui.note(_('(falling back to static-http)\n'))
1000 ui.note(_('(falling back to static-http)\n'))
999 return r
1001 return r
1000 except error.RepoError:
1002 except error.RepoError:
1001 raise httpexception # use the original http RepoError instead
1003 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now