##// END OF EJS Templates
httppeer: implement command executor for version 2 peer...
Gregory Szorc -
r37669:950294e2 default
parent child Browse files
Show More
@@ -1,817 +1,1001 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import io
12 import io
13 import os
13 import os
14 import socket
14 import socket
15 import struct
15 import struct
16 import sys
16 import tempfile
17 import tempfile
18 import weakref
17
19
18 from .i18n import _
20 from .i18n import _
19 from .thirdparty import (
21 from .thirdparty import (
20 cbor,
22 cbor,
21 )
23 )
22 from .thirdparty.zope import (
24 from .thirdparty.zope import (
23 interface as zi,
25 interface as zi,
24 )
26 )
25 from . import (
27 from . import (
26 bundle2,
28 bundle2,
27 error,
29 error,
28 httpconnection,
30 httpconnection,
29 pycompat,
31 pycompat,
30 repository,
32 repository,
31 statichttprepo,
33 statichttprepo,
32 url as urlmod,
34 url as urlmod,
33 util,
35 util,
34 wireproto,
35 wireprotoframing,
36 wireprotoframing,
36 wireprototypes,
37 wireprototypes,
37 wireprotov1peer,
38 wireprotov1peer,
38 wireprotov2server,
39 wireprotov2server,
39 )
40 )
40
41
41 httplib = util.httplib
42 httplib = util.httplib
42 urlerr = util.urlerr
43 urlerr = util.urlerr
43 urlreq = util.urlreq
44 urlreq = util.urlreq
44
45
45 def encodevalueinheaders(value, header, limit):
46 def encodevalueinheaders(value, header, limit):
46 """Encode a string value into multiple HTTP headers.
47 """Encode a string value into multiple HTTP headers.
47
48
48 ``value`` will be encoded into 1 or more HTTP headers with the names
49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 name + value will be at most ``limit`` bytes long.
51 name + value will be at most ``limit`` bytes long.
51
52
52 Returns an iterable of 2-tuples consisting of header names and
53 Returns an iterable of 2-tuples consisting of header names and
53 values as native strings.
54 values as native strings.
54 """
55 """
55 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 # not bytes. This function always takes bytes in as arguments.
57 # not bytes. This function always takes bytes in as arguments.
57 fmt = pycompat.strurl(header) + r'-%s'
58 fmt = pycompat.strurl(header) + r'-%s'
58 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 # and not a unicode: we're just getting the encoded length anyway,
60 # and not a unicode: we're just getting the encoded length anyway,
60 # and using an r-string to make it portable between Python 2 and 3
61 # and using an r-string to make it portable between Python 2 and 3
61 # doesn't work because then the \r is a literal backslash-r
62 # doesn't work because then the \r is a literal backslash-r
62 # instead of a carriage return.
63 # instead of a carriage return.
63 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 result = []
65 result = []
65
66
66 n = 0
67 n = 0
67 for i in xrange(0, len(value), valuelen):
68 for i in xrange(0, len(value), valuelen):
68 n += 1
69 n += 1
69 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70
71
71 return result
72 return result
72
73
73 def _wraphttpresponse(resp):
74 def _wraphttpresponse(resp):
74 """Wrap an HTTPResponse with common error handlers.
75 """Wrap an HTTPResponse with common error handlers.
75
76
76 This ensures that any I/O from any consumer raises the appropriate
77 This ensures that any I/O from any consumer raises the appropriate
77 error and messaging.
78 error and messaging.
78 """
79 """
79 origread = resp.read
80 origread = resp.read
80
81
81 class readerproxy(resp.__class__):
82 class readerproxy(resp.__class__):
82 def read(self, size=None):
83 def read(self, size=None):
83 try:
84 try:
84 return origread(size)
85 return origread(size)
85 except httplib.IncompleteRead as e:
86 except httplib.IncompleteRead as e:
86 # e.expected is an integer if length known or None otherwise.
87 # e.expected is an integer if length known or None otherwise.
87 if e.expected:
88 if e.expected:
88 msg = _('HTTP request error (incomplete response; '
89 msg = _('HTTP request error (incomplete response; '
89 'expected %d bytes got %d)') % (e.expected,
90 'expected %d bytes got %d)') % (e.expected,
90 len(e.partial))
91 len(e.partial))
91 else:
92 else:
92 msg = _('HTTP request error (incomplete response)')
93 msg = _('HTTP request error (incomplete response)')
93
94
94 raise error.PeerTransportError(
95 raise error.PeerTransportError(
95 msg,
96 msg,
96 hint=_('this may be an intermittent network failure; '
97 hint=_('this may be an intermittent network failure; '
97 'if the error persists, consider contacting the '
98 'if the error persists, consider contacting the '
98 'network or server operator'))
99 'network or server operator'))
99 except httplib.HTTPException as e:
100 except httplib.HTTPException as e:
100 raise error.PeerTransportError(
101 raise error.PeerTransportError(
101 _('HTTP request error (%s)') % e,
102 _('HTTP request error (%s)') % e,
102 hint=_('this may be an intermittent network failure; '
103 hint=_('this may be an intermittent network failure; '
103 'if the error persists, consider contacting the '
104 'if the error persists, consider contacting the '
104 'network or server operator'))
105 'network or server operator'))
105
106
106 resp.__class__ = readerproxy
107 resp.__class__ = readerproxy
107
108
108 class _multifile(object):
109 class _multifile(object):
109 def __init__(self, *fileobjs):
110 def __init__(self, *fileobjs):
110 for f in fileobjs:
111 for f in fileobjs:
111 if not util.safehasattr(f, 'length'):
112 if not util.safehasattr(f, 'length'):
112 raise ValueError(
113 raise ValueError(
113 '_multifile only supports file objects that '
114 '_multifile only supports file objects that '
114 'have a length but this one does not:', type(f), f)
115 'have a length but this one does not:', type(f), f)
115 self._fileobjs = fileobjs
116 self._fileobjs = fileobjs
116 self._index = 0
117 self._index = 0
117
118
118 @property
119 @property
119 def length(self):
120 def length(self):
120 return sum(f.length for f in self._fileobjs)
121 return sum(f.length for f in self._fileobjs)
121
122
122 def read(self, amt=None):
123 def read(self, amt=None):
123 if amt <= 0:
124 if amt <= 0:
124 return ''.join(f.read() for f in self._fileobjs)
125 return ''.join(f.read() for f in self._fileobjs)
125 parts = []
126 parts = []
126 while amt and self._index < len(self._fileobjs):
127 while amt and self._index < len(self._fileobjs):
127 parts.append(self._fileobjs[self._index].read(amt))
128 parts.append(self._fileobjs[self._index].read(amt))
128 got = len(parts[-1])
129 got = len(parts[-1])
129 if got < amt:
130 if got < amt:
130 self._index += 1
131 self._index += 1
131 amt -= got
132 amt -= got
132 return ''.join(parts)
133 return ''.join(parts)
133
134
134 def seek(self, offset, whence=os.SEEK_SET):
135 def seek(self, offset, whence=os.SEEK_SET):
135 if whence != os.SEEK_SET:
136 if whence != os.SEEK_SET:
136 raise NotImplementedError(
137 raise NotImplementedError(
137 '_multifile does not support anything other'
138 '_multifile does not support anything other'
138 ' than os.SEEK_SET for whence on seek()')
139 ' than os.SEEK_SET for whence on seek()')
139 if offset != 0:
140 if offset != 0:
140 raise NotImplementedError(
141 raise NotImplementedError(
141 '_multifile only supports seeking to start, but that '
142 '_multifile only supports seeking to start, but that '
142 'could be fixed if you need it')
143 'could be fixed if you need it')
143 for f in self._fileobjs:
144 for f in self._fileobjs:
144 f.seek(0)
145 f.seek(0)
145 self._index = 0
146 self._index = 0
146
147
147 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 repobaseurl, cmd, args):
149 repobaseurl, cmd, args):
149 """Make an HTTP request to run a command for a version 1 client.
150 """Make an HTTP request to run a command for a version 1 client.
150
151
151 ``caps`` is a set of known server capabilities. The value may be
152 ``caps`` is a set of known server capabilities. The value may be
152 None if capabilities are not yet known.
153 None if capabilities are not yet known.
153
154
154 ``capablefn`` is a function to evaluate a capability.
155 ``capablefn`` is a function to evaluate a capability.
155
156
156 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 raw data to pass to it.
158 raw data to pass to it.
158 """
159 """
159 if cmd == 'pushkey':
160 if cmd == 'pushkey':
160 args['data'] = ''
161 args['data'] = ''
161 data = args.pop('data', None)
162 data = args.pop('data', None)
162 headers = args.pop('headers', {})
163 headers = args.pop('headers', {})
163
164
164 ui.debug("sending %s command\n" % cmd)
165 ui.debug("sending %s command\n" % cmd)
165 q = [('cmd', cmd)]
166 q = [('cmd', cmd)]
166 headersize = 0
167 headersize = 0
167 # Important: don't use self.capable() here or else you end up
168 # Important: don't use self.capable() here or else you end up
168 # with infinite recursion when trying to look up capabilities
169 # with infinite recursion when trying to look up capabilities
169 # for the first time.
170 # for the first time.
170 postargsok = caps is not None and 'httppostargs' in caps
171 postargsok = caps is not None and 'httppostargs' in caps
171
172
172 # Send arguments via POST.
173 # Send arguments via POST.
173 if postargsok and args:
174 if postargsok and args:
174 strargs = urlreq.urlencode(sorted(args.items()))
175 strargs = urlreq.urlencode(sorted(args.items()))
175 if not data:
176 if not data:
176 data = strargs
177 data = strargs
177 else:
178 else:
178 if isinstance(data, bytes):
179 if isinstance(data, bytes):
179 i = io.BytesIO(data)
180 i = io.BytesIO(data)
180 i.length = len(data)
181 i.length = len(data)
181 data = i
182 data = i
182 argsio = io.BytesIO(strargs)
183 argsio = io.BytesIO(strargs)
183 argsio.length = len(strargs)
184 argsio.length = len(strargs)
184 data = _multifile(argsio, data)
185 data = _multifile(argsio, data)
185 headers[r'X-HgArgs-Post'] = len(strargs)
186 headers[r'X-HgArgs-Post'] = len(strargs)
186 elif args:
187 elif args:
187 # Calling self.capable() can infinite loop if we are calling
188 # Calling self.capable() can infinite loop if we are calling
188 # "capabilities". But that command should never accept wire
189 # "capabilities". But that command should never accept wire
189 # protocol arguments. So this should never happen.
190 # protocol arguments. So this should never happen.
190 assert cmd != 'capabilities'
191 assert cmd != 'capabilities'
191 httpheader = capablefn('httpheader')
192 httpheader = capablefn('httpheader')
192 if httpheader:
193 if httpheader:
193 headersize = int(httpheader.split(',', 1)[0])
194 headersize = int(httpheader.split(',', 1)[0])
194
195
195 # Send arguments via HTTP headers.
196 # Send arguments via HTTP headers.
196 if headersize > 0:
197 if headersize > 0:
197 # The headers can typically carry more data than the URL.
198 # The headers can typically carry more data than the URL.
198 encargs = urlreq.urlencode(sorted(args.items()))
199 encargs = urlreq.urlencode(sorted(args.items()))
199 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 headersize):
201 headersize):
201 headers[header] = value
202 headers[header] = value
202 # Send arguments via query string (Mercurial <1.9).
203 # Send arguments via query string (Mercurial <1.9).
203 else:
204 else:
204 q += sorted(args.items())
205 q += sorted(args.items())
205
206
206 qs = '?%s' % urlreq.urlencode(q)
207 qs = '?%s' % urlreq.urlencode(q)
207 cu = "%s%s" % (repobaseurl, qs)
208 cu = "%s%s" % (repobaseurl, qs)
208 size = 0
209 size = 0
209 if util.safehasattr(data, 'length'):
210 if util.safehasattr(data, 'length'):
210 size = data.length
211 size = data.length
211 elif data is not None:
212 elif data is not None:
212 size = len(data)
213 size = len(data)
213 if data is not None and r'Content-Type' not in headers:
214 if data is not None and r'Content-Type' not in headers:
214 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 headers[r'Content-Type'] = r'application/mercurial-0.1'
215
216
216 # Tell the server we accept application/mercurial-0.2 and multiple
217 # Tell the server we accept application/mercurial-0.2 and multiple
217 # compression formats if the server is capable of emitting those
218 # compression formats if the server is capable of emitting those
218 # payloads.
219 # payloads.
219 # Note: Keep this set empty by default, as client advertisement of
220 # Note: Keep this set empty by default, as client advertisement of
220 # protocol parameters should only occur after the handshake.
221 # protocol parameters should only occur after the handshake.
221 protoparams = set()
222 protoparams = set()
222
223
223 mediatypes = set()
224 mediatypes = set()
224 if caps is not None:
225 if caps is not None:
225 mt = capablefn('httpmediatype')
226 mt = capablefn('httpmediatype')
226 if mt:
227 if mt:
227 protoparams.add('0.1')
228 protoparams.add('0.1')
228 mediatypes = set(mt.split(','))
229 mediatypes = set(mt.split(','))
229
230
230 protoparams.add('partial-pull')
231 protoparams.add('partial-pull')
231
232
232 if '0.2tx' in mediatypes:
233 if '0.2tx' in mediatypes:
233 protoparams.add('0.2')
234 protoparams.add('0.2')
234
235
235 if '0.2tx' in mediatypes and capablefn('compression'):
236 if '0.2tx' in mediatypes and capablefn('compression'):
236 # We /could/ compare supported compression formats and prune
237 # We /could/ compare supported compression formats and prune
237 # non-mutually supported or error if nothing is mutually supported.
238 # non-mutually supported or error if nothing is mutually supported.
238 # For now, send the full list to the server and have it error.
239 # For now, send the full list to the server and have it error.
239 comps = [e.wireprotosupport().name for e in
240 comps = [e.wireprotosupport().name for e in
240 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 protoparams.add('comp=%s' % ','.join(comps))
242 protoparams.add('comp=%s' % ','.join(comps))
242
243
243 if protoparams:
244 if protoparams:
244 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 'X-HgProto',
246 'X-HgProto',
246 headersize or 1024)
247 headersize or 1024)
247 for header, value in protoheaders:
248 for header, value in protoheaders:
248 headers[header] = value
249 headers[header] = value
249
250
250 varyheaders = []
251 varyheaders = []
251 for header in headers:
252 for header in headers:
252 if header.lower().startswith(r'x-hg'):
253 if header.lower().startswith(r'x-hg'):
253 varyheaders.append(header)
254 varyheaders.append(header)
254
255
255 if varyheaders:
256 if varyheaders:
256 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 headers[r'Vary'] = r','.join(sorted(varyheaders))
257
258
258 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 req = requestbuilder(pycompat.strurl(cu), data, headers)
259
260
260 if data is not None:
261 if data is not None:
261 ui.debug("sending %d bytes\n" % size)
262 ui.debug("sending %d bytes\n" % size)
262 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263
264
264 return req, cu, qs
265 return req, cu, qs
265
266
266 def sendrequest(ui, opener, req):
267 def sendrequest(ui, opener, req):
267 """Send a prepared HTTP request.
268 """Send a prepared HTTP request.
268
269
269 Returns the response object.
270 Returns the response object.
270 """
271 """
271 if (ui.debugflag
272 if (ui.debugflag
272 and ui.configbool('devel', 'debug.peer-request')):
273 and ui.configbool('devel', 'debug.peer-request')):
273 dbg = ui.debug
274 dbg = ui.debug
274 line = 'devel-peer-request: %s\n'
275 line = 'devel-peer-request: %s\n'
275 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
276 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
276 hgargssize = None
277 hgargssize = None
277
278
278 for header, value in sorted(req.header_items()):
279 for header, value in sorted(req.header_items()):
279 if header.startswith('X-hgarg-'):
280 if header.startswith('X-hgarg-'):
280 if hgargssize is None:
281 if hgargssize is None:
281 hgargssize = 0
282 hgargssize = 0
282 hgargssize += len(value)
283 hgargssize += len(value)
283 else:
284 else:
284 dbg(line % ' %s %s' % (header, value))
285 dbg(line % ' %s %s' % (header, value))
285
286
286 if hgargssize is not None:
287 if hgargssize is not None:
287 dbg(line % ' %d bytes of commands arguments in headers'
288 dbg(line % ' %d bytes of commands arguments in headers'
288 % hgargssize)
289 % hgargssize)
289
290
290 if req.has_data():
291 if req.has_data():
291 data = req.get_data()
292 data = req.get_data()
292 length = getattr(data, 'length', None)
293 length = getattr(data, 'length', None)
293 if length is None:
294 if length is None:
294 length = len(data)
295 length = len(data)
295 dbg(line % ' %d bytes of data' % length)
296 dbg(line % ' %d bytes of data' % length)
296
297
297 start = util.timer()
298 start = util.timer()
298
299
299 try:
300 try:
300 res = opener.open(req)
301 res = opener.open(req)
301 except urlerr.httperror as inst:
302 except urlerr.httperror as inst:
302 if inst.code == 401:
303 if inst.code == 401:
303 raise error.Abort(_('authorization failed'))
304 raise error.Abort(_('authorization failed'))
304 raise
305 raise
305 except httplib.HTTPException as inst:
306 except httplib.HTTPException as inst:
306 ui.debug('http error requesting %s\n' %
307 ui.debug('http error requesting %s\n' %
307 util.hidepassword(req.get_full_url()))
308 util.hidepassword(req.get_full_url()))
308 ui.traceback()
309 ui.traceback()
309 raise IOError(None, inst)
310 raise IOError(None, inst)
310 finally:
311 finally:
311 if ui.configbool('devel', 'debug.peer-request'):
312 if ui.configbool('devel', 'debug.peer-request'):
312 dbg(line % ' finished in %.4f seconds (%s)'
313 dbg(line % ' finished in %.4f seconds (%s)'
313 % (util.timer() - start, res.code))
314 % (util.timer() - start, res.code))
314
315
315 # Insert error handlers for common I/O failures.
316 # Insert error handlers for common I/O failures.
316 _wraphttpresponse(res)
317 _wraphttpresponse(res)
317
318
318 return res
319 return res
319
320
320 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
321 allowcbor=False):
322 allowcbor=False):
322 # record the url we got redirected to
323 # record the url we got redirected to
323 respurl = pycompat.bytesurl(resp.geturl())
324 respurl = pycompat.bytesurl(resp.geturl())
324 if respurl.endswith(qs):
325 if respurl.endswith(qs):
325 respurl = respurl[:-len(qs)]
326 respurl = respurl[:-len(qs)]
326 if baseurl.rstrip('/') != respurl.rstrip('/'):
327 if baseurl.rstrip('/') != respurl.rstrip('/'):
327 if not ui.quiet:
328 if not ui.quiet:
328 ui.warn(_('real URL is %s\n') % respurl)
329 ui.warn(_('real URL is %s\n') % respurl)
329
330
330 try:
331 try:
331 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
332 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
332 except AttributeError:
333 except AttributeError:
333 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
334 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
334
335
335 safeurl = util.hidepassword(baseurl)
336 safeurl = util.hidepassword(baseurl)
336 if proto.startswith('application/hg-error'):
337 if proto.startswith('application/hg-error'):
337 raise error.OutOfBandError(resp.read())
338 raise error.OutOfBandError(resp.read())
338
339
339 # Pre 1.0 versions of Mercurial used text/plain and
340 # Pre 1.0 versions of Mercurial used text/plain and
340 # application/hg-changegroup. We don't support such old servers.
341 # application/hg-changegroup. We don't support such old servers.
341 if not proto.startswith('application/mercurial-'):
342 if not proto.startswith('application/mercurial-'):
342 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
343 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
343 raise error.RepoError(
344 raise error.RepoError(
344 _("'%s' does not appear to be an hg repository:\n"
345 _("'%s' does not appear to be an hg repository:\n"
345 "---%%<--- (%s)\n%s\n---%%<---\n")
346 "---%%<--- (%s)\n%s\n---%%<---\n")
346 % (safeurl, proto or 'no content-type', resp.read(1024)))
347 % (safeurl, proto or 'no content-type', resp.read(1024)))
347
348
348 try:
349 try:
349 subtype = proto.split('-', 1)[1]
350 subtype = proto.split('-', 1)[1]
350
351
351 # Unless we end up supporting CBOR in the legacy wire protocol,
352 # Unless we end up supporting CBOR in the legacy wire protocol,
352 # this should ONLY be encountered for the initial capabilities
353 # this should ONLY be encountered for the initial capabilities
353 # request during handshake.
354 # request during handshake.
354 if subtype == 'cbor':
355 if subtype == 'cbor':
355 if allowcbor:
356 if allowcbor:
356 return respurl, proto, resp
357 return respurl, proto, resp
357 else:
358 else:
358 raise error.RepoError(_('unexpected CBOR response from '
359 raise error.RepoError(_('unexpected CBOR response from '
359 'server'))
360 'server'))
360
361
361 version_info = tuple([int(n) for n in subtype.split('.')])
362 version_info = tuple([int(n) for n in subtype.split('.')])
362 except ValueError:
363 except ValueError:
363 raise error.RepoError(_("'%s' sent a broken Content-Type "
364 raise error.RepoError(_("'%s' sent a broken Content-Type "
364 "header (%s)") % (safeurl, proto))
365 "header (%s)") % (safeurl, proto))
365
366
366 # TODO consider switching to a decompression reader that uses
367 # TODO consider switching to a decompression reader that uses
367 # generators.
368 # generators.
368 if version_info == (0, 1):
369 if version_info == (0, 1):
369 if compressible:
370 if compressible:
370 resp = util.compengines['zlib'].decompressorreader(resp)
371 resp = util.compengines['zlib'].decompressorreader(resp)
371
372
372 elif version_info == (0, 2):
373 elif version_info == (0, 2):
373 # application/mercurial-0.2 always identifies the compression
374 # application/mercurial-0.2 always identifies the compression
374 # engine in the payload header.
375 # engine in the payload header.
375 elen = struct.unpack('B', resp.read(1))[0]
376 elen = struct.unpack('B', resp.read(1))[0]
376 ename = resp.read(elen)
377 ename = resp.read(elen)
377 engine = util.compengines.forwiretype(ename)
378 engine = util.compengines.forwiretype(ename)
378
379
379 resp = engine.decompressorreader(resp)
380 resp = engine.decompressorreader(resp)
380 else:
381 else:
381 raise error.RepoError(_("'%s' uses newer protocol %s") %
382 raise error.RepoError(_("'%s' uses newer protocol %s") %
382 (safeurl, subtype))
383 (safeurl, subtype))
383
384
384 return respurl, proto, resp
385 return respurl, proto, resp
385
386
386 class httppeer(wireprotov1peer.wirepeer):
387 class httppeer(wireprotov1peer.wirepeer):
387 def __init__(self, ui, path, url, opener, requestbuilder, caps):
388 def __init__(self, ui, path, url, opener, requestbuilder, caps):
388 self.ui = ui
389 self.ui = ui
389 self._path = path
390 self._path = path
390 self._url = url
391 self._url = url
391 self._caps = caps
392 self._caps = caps
392 self._urlopener = opener
393 self._urlopener = opener
393 self._requestbuilder = requestbuilder
394 self._requestbuilder = requestbuilder
394
395
395 def __del__(self):
396 def __del__(self):
396 for h in self._urlopener.handlers:
397 for h in self._urlopener.handlers:
397 h.close()
398 h.close()
398 getattr(h, "close_all", lambda: None)()
399 getattr(h, "close_all", lambda: None)()
399
400
400 # Begin of ipeerconnection interface.
401 # Begin of ipeerconnection interface.
401
402
402 def url(self):
403 def url(self):
403 return self._path
404 return self._path
404
405
405 def local(self):
406 def local(self):
406 return None
407 return None
407
408
408 def peer(self):
409 def peer(self):
409 return self
410 return self
410
411
411 def canpush(self):
412 def canpush(self):
412 return True
413 return True
413
414
414 def close(self):
415 def close(self):
415 pass
416 pass
416
417
417 # End of ipeerconnection interface.
418 # End of ipeerconnection interface.
418
419
419 # Begin of ipeercommands interface.
420 # Begin of ipeercommands interface.
420
421
421 def capabilities(self):
422 def capabilities(self):
422 return self._caps
423 return self._caps
423
424
424 # End of ipeercommands interface.
425 # End of ipeercommands interface.
425
426
426 # look up capabilities only when needed
427 # look up capabilities only when needed
427
428
428 def _callstream(self, cmd, _compressible=False, **args):
429 def _callstream(self, cmd, _compressible=False, **args):
429 args = pycompat.byteskwargs(args)
430 args = pycompat.byteskwargs(args)
430
431
431 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
432 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
432 self._caps, self.capable,
433 self._caps, self.capable,
433 self._url, cmd, args)
434 self._url, cmd, args)
434
435
435 resp = sendrequest(self.ui, self._urlopener, req)
436 resp = sendrequest(self.ui, self._urlopener, req)
436
437
437 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
438 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
438 resp, _compressible)
439 resp, _compressible)
439
440
440 return resp
441 return resp
441
442
442 def _call(self, cmd, **args):
443 def _call(self, cmd, **args):
443 fp = self._callstream(cmd, **args)
444 fp = self._callstream(cmd, **args)
444 try:
445 try:
445 return fp.read()
446 return fp.read()
446 finally:
447 finally:
447 # if using keepalive, allow connection to be reused
448 # if using keepalive, allow connection to be reused
448 fp.close()
449 fp.close()
449
450
450 def _callpush(self, cmd, cg, **args):
451 def _callpush(self, cmd, cg, **args):
451 # have to stream bundle to a temp file because we do not have
452 # have to stream bundle to a temp file because we do not have
452 # http 1.1 chunked transfer.
453 # http 1.1 chunked transfer.
453
454
454 types = self.capable('unbundle')
455 types = self.capable('unbundle')
455 try:
456 try:
456 types = types.split(',')
457 types = types.split(',')
457 except AttributeError:
458 except AttributeError:
458 # servers older than d1b16a746db6 will send 'unbundle' as a
459 # servers older than d1b16a746db6 will send 'unbundle' as a
459 # boolean capability. They only support headerless/uncompressed
460 # boolean capability. They only support headerless/uncompressed
460 # bundles.
461 # bundles.
461 types = [""]
462 types = [""]
462 for x in types:
463 for x in types:
463 if x in bundle2.bundletypes:
464 if x in bundle2.bundletypes:
464 type = x
465 type = x
465 break
466 break
466
467
467 tempname = bundle2.writebundle(self.ui, cg, None, type)
468 tempname = bundle2.writebundle(self.ui, cg, None, type)
468 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
469 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
469 headers = {r'Content-Type': r'application/mercurial-0.1'}
470 headers = {r'Content-Type': r'application/mercurial-0.1'}
470
471
471 try:
472 try:
472 r = self._call(cmd, data=fp, headers=headers, **args)
473 r = self._call(cmd, data=fp, headers=headers, **args)
473 vals = r.split('\n', 1)
474 vals = r.split('\n', 1)
474 if len(vals) < 2:
475 if len(vals) < 2:
475 raise error.ResponseError(_("unexpected response:"), r)
476 raise error.ResponseError(_("unexpected response:"), r)
476 return vals
477 return vals
477 except urlerr.httperror:
478 except urlerr.httperror:
478 # Catch and re-raise these so we don't try and treat them
479 # Catch and re-raise these so we don't try and treat them
479 # like generic socket errors. They lack any values in
480 # like generic socket errors. They lack any values in
480 # .args on Python 3 which breaks our socket.error block.
481 # .args on Python 3 which breaks our socket.error block.
481 raise
482 raise
482 except socket.error as err:
483 except socket.error as err:
483 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
484 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
484 raise error.Abort(_('push failed: %s') % err.args[1])
485 raise error.Abort(_('push failed: %s') % err.args[1])
485 raise error.Abort(err.args[1])
486 raise error.Abort(err.args[1])
486 finally:
487 finally:
487 fp.close()
488 fp.close()
488 os.unlink(tempname)
489 os.unlink(tempname)
489
490
490 def _calltwowaystream(self, cmd, fp, **args):
491 def _calltwowaystream(self, cmd, fp, **args):
491 fh = None
492 fh = None
492 fp_ = None
493 fp_ = None
493 filename = None
494 filename = None
494 try:
495 try:
495 # dump bundle to disk
496 # dump bundle to disk
496 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
497 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
497 fh = os.fdopen(fd, r"wb")
498 fh = os.fdopen(fd, r"wb")
498 d = fp.read(4096)
499 d = fp.read(4096)
499 while d:
500 while d:
500 fh.write(d)
501 fh.write(d)
501 d = fp.read(4096)
502 d = fp.read(4096)
502 fh.close()
503 fh.close()
503 # start http push
504 # start http push
504 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
505 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
505 headers = {r'Content-Type': r'application/mercurial-0.1'}
506 headers = {r'Content-Type': r'application/mercurial-0.1'}
506 return self._callstream(cmd, data=fp_, headers=headers, **args)
507 return self._callstream(cmd, data=fp_, headers=headers, **args)
507 finally:
508 finally:
508 if fp_ is not None:
509 if fp_ is not None:
509 fp_.close()
510 fp_.close()
510 if fh is not None:
511 if fh is not None:
511 fh.close()
512 fh.close()
512 os.unlink(filename)
513 os.unlink(filename)
513
514
514 def _callcompressable(self, cmd, **args):
515 def _callcompressable(self, cmd, **args):
515 return self._callstream(cmd, _compressible=True, **args)
516 return self._callstream(cmd, _compressible=True, **args)
516
517
517 def _abort(self, exception):
518 def _abort(self, exception):
518 raise exception
519 raise exception
519
520
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 buffersends=True)
524
525 url = '%s/%s' % (apiurl, permission)
526
527 if len(requests) > 1:
528 url += '/multirequest'
529 else:
530 url += '/%s' % requests[0][0]
531
532 # Request ID to (request, future)
533 requestmap = {}
534
535 for command, args, f in requests:
536 request, action, meta = reactor.callcommand(command, args)
537 assert action == 'noop'
538
539 requestmap[request.requestid] = (request, f)
540
541 action, meta = reactor.flushcommands()
542 assert action == 'sendframes'
543
544 # TODO stream this.
545 body = b''.join(map(bytes, meta['framegen']))
546
547 # TODO modify user-agent to reflect v2
548 headers = {
549 r'Accept': wireprotov2server.FRAMINGTYPE,
550 r'Content-Type': wireprotov2server.FRAMINGTYPE,
551 }
552
553 req = requestbuilder(pycompat.strurl(url), body, headers)
554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
555
556 try:
557 res = opener.open(req)
558 except urlerr.httperror as e:
559 if e.code == 401:
560 raise error.Abort(_('authorization failed'))
561
562 raise
563 except httplib.HTTPException as e:
564 ui.traceback()
565 raise IOError(None, e)
566
567 return reactor, requestmap, res
568
569 class queuedcommandfuture(pycompat.futures.Future):
570 """Wraps result() on command futures to trigger submission on call."""
571
572 def result(self, timeout=None):
573 if self.done():
574 return pycompat.futures.Future.result(self, timeout)
575
576 self._peerexecutor.sendcommands()
577
578 # sendcommands() will restore the original __class__ and self.result
579 # will resolve to Future.result.
580 return self.result(timeout)
581
582 @zi.implementer(repository.ipeercommandexecutor)
583 class httpv2executor(object):
584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
585 self._ui = ui
586 self._opener = opener
587 self._requestbuilder = requestbuilder
588 self._apiurl = apiurl
589 self._descriptor = descriptor
590 self._sent = False
591 self._closed = False
592 self._neededpermissions = set()
593 self._calls = []
594 self._futures = weakref.WeakSet()
595 self._responseexecutor = None
596 self._responsef = None
597
598 def __enter__(self):
599 return self
600
601 def __exit__(self, exctype, excvalue, exctb):
602 self.close()
603
604 def callcommand(self, command, args):
605 if self._sent:
606 raise error.ProgrammingError('callcommand() cannot be used after '
607 'commands are sent')
608
609 if self._closed:
610 raise error.ProgrammingError('callcommand() cannot be used after '
611 'close()')
612
613 # The service advertises which commands are available. So if we attempt
614 # to call an unknown command or pass an unknown argument, we can screen
615 # for this.
616 if command not in self._descriptor['commands']:
617 raise error.ProgrammingError(
618 'wire protocol command %s is not available' % command)
619
620 cmdinfo = self._descriptor['commands'][command]
621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
622
623 if unknownargs:
624 raise error.ProgrammingError(
625 'wire protocol command %s does not accept argument: %s' % (
626 command, ', '.join(sorted(unknownargs))))
627
628 self._neededpermissions |= set(cmdinfo['permissions'])
629
630 # TODO we /could/ also validate types here, since the API descriptor
631 # includes types...
632
633 f = pycompat.futures.Future()
634
635 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
636 # could deadlock.
637 f.__class__ = queuedcommandfuture
638 f._peerexecutor = self
639
640 self._futures.add(f)
641 self._calls.append((command, args, f))
642
643 return f
644
645 def sendcommands(self):
646 if self._sent:
647 return
648
649 if not self._calls:
650 return
651
652 self._sent = True
653
654 # Unhack any future types so caller sees a clean type and so we
655 # break reference cycle.
656 for f in self._futures:
657 if isinstance(f, queuedcommandfuture):
658 f.__class__ = pycompat.futures.Future
659 f._peerexecutor = None
660
661 # Mark the future as running and filter out cancelled futures.
662 calls = [(command, args, f)
663 for command, args, f in self._calls
664 if f.set_running_or_notify_cancel()]
665
666 # Clear out references, prevent improper object usage.
667 self._calls = None
668
669 if not calls:
670 return
671
672 permissions = set(self._neededpermissions)
673
674 if 'push' in permissions and 'pull' in permissions:
675 permissions.remove('pull')
676
677 if len(permissions) > 1:
678 raise error.RepoError(_('cannot make request requiring multiple '
679 'permissions: %s') %
680 _(', ').join(sorted(permissions)))
681
682 permission = {
683 'push': 'rw',
684 'pull': 'ro',
685 }[permissions.pop()]
686
687 reactor, requests, resp = sendv2request(
688 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 permission, calls)
690
691 # TODO we probably want to validate the HTTP code, media type, etc.
692
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 reactor,
696 requests,
697 resp)
698
699 def close(self):
700 if self._closed:
701 return
702
703 self.sendcommands()
704
705 self._closed = True
706
707 if not self._responsef:
708 return
709
710 try:
711 self._responsef.result()
712 finally:
713 self._responseexecutor.shutdown(wait=True)
714 self._responsef = None
715 self._responseexecutor = None
716
717 # If any of our futures are still in progress, mark them as
718 # errored, otherwise a result() could wait indefinitely.
719 for f in self._futures:
720 if not f.done():
721 f.set_exception(error.ResponseError(
722 _('unfulfilled command response')))
723
724 self._futures = None
725
726 def _handleresponse(self, reactor, requests, resp):
727 # Called in a thread to read the response.
728
729 results = {k: [] for k in requests}
730
731 while True:
732 frame = wireprotoframing.readframe(resp)
733 if frame is None:
734 break
735
736 self._ui.note(_('received %r\n') % frame)
737
738 # Guard against receiving a frame with a request ID that we
739 # didn't issue. This should never happen.
740 request, f = requests.get(frame.requestid, [None, None])
741
742 action, meta = reactor.onframerecv(frame)
743
744 if action == 'responsedata':
745 assert request.requestid == meta['request'].requestid
746
747 result = results[request.requestid]
748
749 if meta['cbor']:
750 payload = util.bytesio(meta['data'])
751
752 decoder = cbor.CBORDecoder(payload)
753 while payload.tell() + 1 < len(meta['data']):
754 try:
755 result.append(decoder.decode())
756 except Exception:
757 f.set_exception_info(*sys.exc_info()[1:])
758 continue
759 else:
760 result.append(meta['data'])
761
762 if meta['eos']:
763 f.set_result(result)
764 del results[request.requestid]
765
766 else:
767 e = error.ProgrammingError('unhandled action: %s' % action)
768
769 if f:
770 f.set_exception(e)
771 else:
772 raise e
773
520 # TODO implement interface for version 2 peers
774 # TODO implement interface for version 2 peers
521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
776 repository.ipeerrequests)
522 class httpv2peer(object):
777 class httpv2peer(object):
523 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
778 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
524 apidescriptor):
779 apidescriptor):
525 self.ui = ui
780 self.ui = ui
526
781
527 if repourl.endswith('/'):
782 if repourl.endswith('/'):
528 repourl = repourl[:-1]
783 repourl = repourl[:-1]
529
784
530 self._url = repourl
785 self._url = repourl
531 self._apipath = apipath
786 self._apipath = apipath
787 self._apiurl = '%s/%s' % (repourl, apipath)
532 self._opener = opener
788 self._opener = opener
533 self._requestbuilder = requestbuilder
789 self._requestbuilder = requestbuilder
534 self._descriptor = apidescriptor
790 self._descriptor = apidescriptor
535
791
536 # Start of ipeerconnection.
792 # Start of ipeerconnection.
537
793
538 def url(self):
794 def url(self):
539 return self._url
795 return self._url
540
796
541 def local(self):
797 def local(self):
542 return None
798 return None
543
799
544 def peer(self):
800 def peer(self):
545 return self
801 return self
546
802
547 def canpush(self):
803 def canpush(self):
548 # TODO change once implemented.
804 # TODO change once implemented.
549 return False
805 return False
550
806
551 def close(self):
807 def close(self):
552 pass
808 pass
553
809
554 # End of ipeerconnection.
810 # End of ipeerconnection.
555
811
556 # Start of ipeercapabilities.
812 # Start of ipeercapabilities.
557
813
558 def capable(self, name):
814 def capable(self, name):
559 # The capabilities used internally historically map to capabilities
815 # The capabilities used internally historically map to capabilities
560 # advertised from the "capabilities" wire protocol command. However,
816 # advertised from the "capabilities" wire protocol command. However,
561 # version 2 of that command works differently.
817 # version 2 of that command works differently.
562
818
563 # Maps to commands that are available.
819 # Maps to commands that are available.
564 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
820 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
565 return True
821 return True
566
822
567 # Other concepts.
823 # Other concepts.
568 if name in ('bundle2',):
824 if name in ('bundle2',):
569 return True
825 return True
570
826
571 return False
827 return False
572
828
573 def requirecap(self, name, purpose):
829 def requirecap(self, name, purpose):
574 if self.capable(name):
830 if self.capable(name):
575 return
831 return
576
832
577 raise error.CapabilityError(
833 raise error.CapabilityError(
578 _('cannot %s; client or remote repository does not support the %r '
834 _('cannot %s; client or remote repository does not support the %r '
579 'capability') % (purpose, name))
835 'capability') % (purpose, name))
580
836
581 # End of ipeercapabilities.
837 # End of ipeercapabilities.
582
838
583 # TODO require to be part of a batched primitive, use futures.
584 def _call(self, name, **args):
839 def _call(self, name, **args):
585 """Call a wire protocol command with arguments."""
840 with self.commandexecutor() as e:
586
841 return e.callcommand(name, args).result()
587 # Having this early has a side-effect of importing wireprotov2server,
588 # which has the side-effect of ensuring commands are registered.
589
590 # TODO modify user-agent to reflect v2.
591 headers = {
592 r'Accept': wireprotov2server.FRAMINGTYPE,
593 r'Content-Type': wireprotov2server.FRAMINGTYPE,
594 }
595
596 # TODO permissions should come from capabilities results.
597 permission = wireproto.commandsv2[name].permission
598 if permission not in ('push', 'pull'):
599 raise error.ProgrammingError('unknown permission type: %s' %
600 permission)
601
602 permission = {
603 'push': 'rw',
604 'pull': 'ro',
605 }[permission]
606
607 url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
608
609 # TODO this should be part of a generic peer for the frame-based
610 # protocol.
611 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
612 buffersends=True)
613
614 request, action, meta = reactor.callcommand(name, args)
615 assert action == 'noop'
616
617 action, meta = reactor.flushcommands()
618 assert action == 'sendframes'
619
842
620 body = b''.join(map(bytes, meta['framegen']))
843 def commandexecutor(self):
621 req = self._requestbuilder(pycompat.strurl(url), body, headers)
844 return httpv2executor(self.ui, self._opener, self._requestbuilder,
622 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
845 self._apiurl, self._descriptor)
623
624 # TODO unify this code with httppeer.
625 try:
626 res = self._opener.open(req)
627 except urlerr.httperror as e:
628 if e.code == 401:
629 raise error.Abort(_('authorization failed'))
630
631 raise
632 except httplib.HTTPException as e:
633 self.ui.traceback()
634 raise IOError(None, e)
635
636 # TODO validate response type, wrap response to handle I/O errors.
637 # TODO more robust frame receiver.
638 results = []
639
640 while True:
641 frame = wireprotoframing.readframe(res)
642 if frame is None:
643 break
644
645 self.ui.note(_('received %r\n') % frame)
646
647 action, meta = reactor.onframerecv(frame)
648
649 if action == 'responsedata':
650 if meta['cbor']:
651 payload = util.bytesio(meta['data'])
652
653 decoder = cbor.CBORDecoder(payload)
654 while payload.tell() + 1 < len(meta['data']):
655 results.append(decoder.decode())
656 else:
657 results.append(meta['data'])
658 else:
659 error.ProgrammingError('unhandled action: %s' % action)
660
661 return results
662
846
663 # Registry of API service names to metadata about peers that handle it.
847 # Registry of API service names to metadata about peers that handle it.
664 #
848 #
665 # The following keys are meaningful:
849 # The following keys are meaningful:
666 #
850 #
667 # init
851 # init
668 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
852 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
669 # apidescriptor) to create a peer.
853 # apidescriptor) to create a peer.
670 #
854 #
671 # priority
855 # priority
672 # Integer priority for the service. If we could choose from multiple
856 # Integer priority for the service. If we could choose from multiple
673 # services, we choose the one with the highest priority.
857 # services, we choose the one with the highest priority.
674 API_PEERS = {
858 API_PEERS = {
675 wireprototypes.HTTP_WIREPROTO_V2: {
859 wireprototypes.HTTP_WIREPROTO_V2: {
676 'init': httpv2peer,
860 'init': httpv2peer,
677 'priority': 50,
861 'priority': 50,
678 },
862 },
679 }
863 }
680
864
681 def performhandshake(ui, url, opener, requestbuilder):
865 def performhandshake(ui, url, opener, requestbuilder):
682 # The handshake is a request to the capabilities command.
866 # The handshake is a request to the capabilities command.
683
867
684 caps = None
868 caps = None
685 def capable(x):
869 def capable(x):
686 raise error.ProgrammingError('should not be called')
870 raise error.ProgrammingError('should not be called')
687
871
688 args = {}
872 args = {}
689
873
690 # The client advertises support for newer protocols by adding an
874 # The client advertises support for newer protocols by adding an
691 # X-HgUpgrade-* header with a list of supported APIs and an
875 # X-HgUpgrade-* header with a list of supported APIs and an
692 # X-HgProto-* header advertising which serializing formats it supports.
876 # X-HgProto-* header advertising which serializing formats it supports.
693 # We only support the HTTP version 2 transport and CBOR responses for
877 # We only support the HTTP version 2 transport and CBOR responses for
694 # now.
878 # now.
695 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
879 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
696
880
697 if advertisev2:
881 if advertisev2:
698 args['headers'] = {
882 args['headers'] = {
699 r'X-HgProto-1': r'cbor',
883 r'X-HgProto-1': r'cbor',
700 }
884 }
701
885
702 args['headers'].update(
886 args['headers'].update(
703 encodevalueinheaders(' '.join(sorted(API_PEERS)),
887 encodevalueinheaders(' '.join(sorted(API_PEERS)),
704 'X-HgUpgrade',
888 'X-HgUpgrade',
705 # We don't know the header limit this early.
889 # We don't know the header limit this early.
706 # So make it small.
890 # So make it small.
707 1024))
891 1024))
708
892
709 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
893 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
710 capable, url, 'capabilities',
894 capable, url, 'capabilities',
711 args)
895 args)
712
896
713 resp = sendrequest(ui, opener, req)
897 resp = sendrequest(ui, opener, req)
714
898
715 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
716 compressible=False,
900 compressible=False,
717 allowcbor=advertisev2)
901 allowcbor=advertisev2)
718
902
719 try:
903 try:
720 rawdata = resp.read()
904 rawdata = resp.read()
721 finally:
905 finally:
722 resp.close()
906 resp.close()
723
907
724 if not ct.startswith('application/mercurial-'):
908 if not ct.startswith('application/mercurial-'):
725 raise error.ProgrammingError('unexpected content-type: %s' % ct)
909 raise error.ProgrammingError('unexpected content-type: %s' % ct)
726
910
727 if advertisev2:
911 if advertisev2:
728 if ct == 'application/mercurial-cbor':
912 if ct == 'application/mercurial-cbor':
729 try:
913 try:
730 info = cbor.loads(rawdata)
914 info = cbor.loads(rawdata)
731 except cbor.CBORDecodeError:
915 except cbor.CBORDecodeError:
732 raise error.Abort(_('error decoding CBOR from remote server'),
916 raise error.Abort(_('error decoding CBOR from remote server'),
733 hint=_('try again and consider contacting '
917 hint=_('try again and consider contacting '
734 'the server operator'))
918 'the server operator'))
735
919
736 # We got a legacy response. That's fine.
920 # We got a legacy response. That's fine.
737 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
921 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
738 info = {
922 info = {
739 'v1capabilities': set(rawdata.split())
923 'v1capabilities': set(rawdata.split())
740 }
924 }
741
925
742 else:
926 else:
743 raise error.RepoError(
927 raise error.RepoError(
744 _('unexpected response type from server: %s') % ct)
928 _('unexpected response type from server: %s') % ct)
745 else:
929 else:
746 info = {
930 info = {
747 'v1capabilities': set(rawdata.split())
931 'v1capabilities': set(rawdata.split())
748 }
932 }
749
933
750 return respurl, info
934 return respurl, info
751
935
752 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
936 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
753 """Construct an appropriate HTTP peer instance.
937 """Construct an appropriate HTTP peer instance.
754
938
755 ``opener`` is an ``url.opener`` that should be used to establish
939 ``opener`` is an ``url.opener`` that should be used to establish
756 connections, perform HTTP requests.
940 connections, perform HTTP requests.
757
941
758 ``requestbuilder`` is the type used for constructing HTTP requests.
942 ``requestbuilder`` is the type used for constructing HTTP requests.
759 It exists as an argument so extensions can override the default.
943 It exists as an argument so extensions can override the default.
760 """
944 """
761 u = util.url(path)
945 u = util.url(path)
762 if u.query or u.fragment:
946 if u.query or u.fragment:
763 raise error.Abort(_('unsupported URL component: "%s"') %
947 raise error.Abort(_('unsupported URL component: "%s"') %
764 (u.query or u.fragment))
948 (u.query or u.fragment))
765
949
766 # urllib cannot handle URLs with embedded user or passwd.
950 # urllib cannot handle URLs with embedded user or passwd.
767 url, authinfo = u.authinfo()
951 url, authinfo = u.authinfo()
768 ui.debug('using %s\n' % url)
952 ui.debug('using %s\n' % url)
769
953
770 opener = opener or urlmod.opener(ui, authinfo)
954 opener = opener or urlmod.opener(ui, authinfo)
771
955
772 respurl, info = performhandshake(ui, url, opener, requestbuilder)
956 respurl, info = performhandshake(ui, url, opener, requestbuilder)
773
957
774 # Given the intersection of APIs that both we and the server support,
958 # Given the intersection of APIs that both we and the server support,
775 # sort by their advertised priority and pick the first one.
959 # sort by their advertised priority and pick the first one.
776 #
960 #
777 # TODO consider making this request-based and interface driven. For
961 # TODO consider making this request-based and interface driven. For
778 # example, the caller could say "I want a peer that does X." It's quite
962 # example, the caller could say "I want a peer that does X." It's quite
779 # possible that not all peers would do that. Since we know the service
963 # possible that not all peers would do that. Since we know the service
780 # capabilities, we could filter out services not meeting the
964 # capabilities, we could filter out services not meeting the
781 # requirements. Possibly by consulting the interfaces defined by the
965 # requirements. Possibly by consulting the interfaces defined by the
782 # peer type.
966 # peer type.
783 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
967 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
784
968
785 preferredchoices = sorted(apipeerchoices,
969 preferredchoices = sorted(apipeerchoices,
786 key=lambda x: API_PEERS[x]['priority'],
970 key=lambda x: API_PEERS[x]['priority'],
787 reverse=True)
971 reverse=True)
788
972
789 for service in preferredchoices:
973 for service in preferredchoices:
790 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
974 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
791
975
792 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
976 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
793 requestbuilder,
977 requestbuilder,
794 info['apis'][service])
978 info['apis'][service])
795
979
796 # Failed to construct an API peer. Fall back to legacy.
980 # Failed to construct an API peer. Fall back to legacy.
797 return httppeer(ui, path, respurl, opener, requestbuilder,
981 return httppeer(ui, path, respurl, opener, requestbuilder,
798 info['v1capabilities'])
982 info['v1capabilities'])
799
983
800 def instance(ui, path, create):
984 def instance(ui, path, create):
801 if create:
985 if create:
802 raise error.Abort(_('cannot create new http repository'))
986 raise error.Abort(_('cannot create new http repository'))
803 try:
987 try:
804 if path.startswith('https:') and not urlmod.has_https:
988 if path.startswith('https:') and not urlmod.has_https:
805 raise error.Abort(_('Python support for SSL and HTTPS '
989 raise error.Abort(_('Python support for SSL and HTTPS '
806 'is not installed'))
990 'is not installed'))
807
991
808 inst = makepeer(ui, path)
992 inst = makepeer(ui, path)
809
993
810 return inst
994 return inst
811 except error.RepoError as httpexception:
995 except error.RepoError as httpexception:
812 try:
996 try:
813 r = statichttprepo.instance(ui, "static-" + path, create)
997 r = statichttprepo.instance(ui, "static-" + path, create)
814 ui.note(_('(falling back to static-http)\n'))
998 ui.note(_('(falling back to static-http)\n'))
815 return r
999 return r
816 except error.RepoError:
1000 except error.RepoError:
817 raise httpexception # use the original http RepoError instead
1001 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now