##// END OF EJS Templates
httppeer: use context manager when writing temporary bundle to send...
Martin von Zweigbergk -
r43118:58f73e9c default
parent child Browse files
Show More
@@ -1,1012 +1,1009 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import io
12 import io
13 import os
13 import os
14 import socket
14 import socket
15 import struct
15 import struct
16 import weakref
16 import weakref
17
17
18 from .i18n import _
18 from .i18n import _
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 error,
21 error,
22 httpconnection,
22 httpconnection,
23 pycompat,
23 pycompat,
24 statichttprepo,
24 statichttprepo,
25 url as urlmod,
25 url as urlmod,
26 util,
26 util,
27 wireprotoframing,
27 wireprotoframing,
28 wireprototypes,
28 wireprototypes,
29 wireprotov1peer,
29 wireprotov1peer,
30 wireprotov2peer,
30 wireprotov2peer,
31 wireprotov2server,
31 wireprotov2server,
32 )
32 )
33 from .interfaces import (
33 from .interfaces import (
34 repository,
34 repository,
35 util as interfaceutil,
35 util as interfaceutil,
36 )
36 )
37 from .utils import (
37 from .utils import (
38 cborutil,
38 cborutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 httplib = util.httplib
42 httplib = util.httplib
43 urlerr = util.urlerr
43 urlerr = util.urlerr
44 urlreq = util.urlreq
44 urlreq = util.urlreq
45
45
46 def encodevalueinheaders(value, header, limit):
46 def encodevalueinheaders(value, header, limit):
47 """Encode a string value into multiple HTTP headers.
47 """Encode a string value into multiple HTTP headers.
48
48
49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 name + value will be at most ``limit`` bytes long.
51 name + value will be at most ``limit`` bytes long.
52
52
53 Returns an iterable of 2-tuples consisting of header names and
53 Returns an iterable of 2-tuples consisting of header names and
54 values as native strings.
54 values as native strings.
55 """
55 """
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 # not bytes. This function always takes bytes in as arguments.
57 # not bytes. This function always takes bytes in as arguments.
58 fmt = pycompat.strurl(header) + r'-%s'
58 fmt = pycompat.strurl(header) + r'-%s'
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 # and not a unicode: we're just getting the encoded length anyway,
60 # and not a unicode: we're just getting the encoded length anyway,
61 # and using an r-string to make it portable between Python 2 and 3
61 # and using an r-string to make it portable between Python 2 and 3
62 # doesn't work because then the \r is a literal backslash-r
62 # doesn't work because then the \r is a literal backslash-r
63 # instead of a carriage return.
63 # instead of a carriage return.
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 result = []
65 result = []
66
66
67 n = 0
67 n = 0
68 for i in pycompat.xrange(0, len(value), valuelen):
68 for i in pycompat.xrange(0, len(value), valuelen):
69 n += 1
69 n += 1
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71
71
72 return result
72 return result
73
73
74 class _multifile(object):
74 class _multifile(object):
75 def __init__(self, *fileobjs):
75 def __init__(self, *fileobjs):
76 for f in fileobjs:
76 for f in fileobjs:
77 if not util.safehasattr(f, 'length'):
77 if not util.safehasattr(f, 'length'):
78 raise ValueError(
78 raise ValueError(
79 '_multifile only supports file objects that '
79 '_multifile only supports file objects that '
80 'have a length but this one does not:', type(f), f)
80 'have a length but this one does not:', type(f), f)
81 self._fileobjs = fileobjs
81 self._fileobjs = fileobjs
82 self._index = 0
82 self._index = 0
83
83
84 @property
84 @property
85 def length(self):
85 def length(self):
86 return sum(f.length for f in self._fileobjs)
86 return sum(f.length for f in self._fileobjs)
87
87
88 def read(self, amt=None):
88 def read(self, amt=None):
89 if amt <= 0:
89 if amt <= 0:
90 return ''.join(f.read() for f in self._fileobjs)
90 return ''.join(f.read() for f in self._fileobjs)
91 parts = []
91 parts = []
92 while amt and self._index < len(self._fileobjs):
92 while amt and self._index < len(self._fileobjs):
93 parts.append(self._fileobjs[self._index].read(amt))
93 parts.append(self._fileobjs[self._index].read(amt))
94 got = len(parts[-1])
94 got = len(parts[-1])
95 if got < amt:
95 if got < amt:
96 self._index += 1
96 self._index += 1
97 amt -= got
97 amt -= got
98 return ''.join(parts)
98 return ''.join(parts)
99
99
100 def seek(self, offset, whence=os.SEEK_SET):
100 def seek(self, offset, whence=os.SEEK_SET):
101 if whence != os.SEEK_SET:
101 if whence != os.SEEK_SET:
102 raise NotImplementedError(
102 raise NotImplementedError(
103 '_multifile does not support anything other'
103 '_multifile does not support anything other'
104 ' than os.SEEK_SET for whence on seek()')
104 ' than os.SEEK_SET for whence on seek()')
105 if offset != 0:
105 if offset != 0:
106 raise NotImplementedError(
106 raise NotImplementedError(
107 '_multifile only supports seeking to start, but that '
107 '_multifile only supports seeking to start, but that '
108 'could be fixed if you need it')
108 'could be fixed if you need it')
109 for f in self._fileobjs:
109 for f in self._fileobjs:
110 f.seek(0)
110 f.seek(0)
111 self._index = 0
111 self._index = 0
112
112
113 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
113 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
114 repobaseurl, cmd, args):
114 repobaseurl, cmd, args):
115 """Make an HTTP request to run a command for a version 1 client.
115 """Make an HTTP request to run a command for a version 1 client.
116
116
117 ``caps`` is a set of known server capabilities. The value may be
117 ``caps`` is a set of known server capabilities. The value may be
118 None if capabilities are not yet known.
118 None if capabilities are not yet known.
119
119
120 ``capablefn`` is a function to evaluate a capability.
120 ``capablefn`` is a function to evaluate a capability.
121
121
122 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
122 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
123 raw data to pass to it.
123 raw data to pass to it.
124 """
124 """
125 if cmd == 'pushkey':
125 if cmd == 'pushkey':
126 args['data'] = ''
126 args['data'] = ''
127 data = args.pop('data', None)
127 data = args.pop('data', None)
128 headers = args.pop('headers', {})
128 headers = args.pop('headers', {})
129
129
130 ui.debug("sending %s command\n" % cmd)
130 ui.debug("sending %s command\n" % cmd)
131 q = [('cmd', cmd)]
131 q = [('cmd', cmd)]
132 headersize = 0
132 headersize = 0
133 # Important: don't use self.capable() here or else you end up
133 # Important: don't use self.capable() here or else you end up
134 # with infinite recursion when trying to look up capabilities
134 # with infinite recursion when trying to look up capabilities
135 # for the first time.
135 # for the first time.
136 postargsok = caps is not None and 'httppostargs' in caps
136 postargsok = caps is not None and 'httppostargs' in caps
137
137
138 # Send arguments via POST.
138 # Send arguments via POST.
139 if postargsok and args:
139 if postargsok and args:
140 strargs = urlreq.urlencode(sorted(args.items()))
140 strargs = urlreq.urlencode(sorted(args.items()))
141 if not data:
141 if not data:
142 data = strargs
142 data = strargs
143 else:
143 else:
144 if isinstance(data, bytes):
144 if isinstance(data, bytes):
145 i = io.BytesIO(data)
145 i = io.BytesIO(data)
146 i.length = len(data)
146 i.length = len(data)
147 data = i
147 data = i
148 argsio = io.BytesIO(strargs)
148 argsio = io.BytesIO(strargs)
149 argsio.length = len(strargs)
149 argsio.length = len(strargs)
150 data = _multifile(argsio, data)
150 data = _multifile(argsio, data)
151 headers[r'X-HgArgs-Post'] = len(strargs)
151 headers[r'X-HgArgs-Post'] = len(strargs)
152 elif args:
152 elif args:
153 # Calling self.capable() can infinite loop if we are calling
153 # Calling self.capable() can infinite loop if we are calling
154 # "capabilities". But that command should never accept wire
154 # "capabilities". But that command should never accept wire
155 # protocol arguments. So this should never happen.
155 # protocol arguments. So this should never happen.
156 assert cmd != 'capabilities'
156 assert cmd != 'capabilities'
157 httpheader = capablefn('httpheader')
157 httpheader = capablefn('httpheader')
158 if httpheader:
158 if httpheader:
159 headersize = int(httpheader.split(',', 1)[0])
159 headersize = int(httpheader.split(',', 1)[0])
160
160
161 # Send arguments via HTTP headers.
161 # Send arguments via HTTP headers.
162 if headersize > 0:
162 if headersize > 0:
163 # The headers can typically carry more data than the URL.
163 # The headers can typically carry more data than the URL.
164 encargs = urlreq.urlencode(sorted(args.items()))
164 encargs = urlreq.urlencode(sorted(args.items()))
165 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
165 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
166 headersize):
166 headersize):
167 headers[header] = value
167 headers[header] = value
168 # Send arguments via query string (Mercurial <1.9).
168 # Send arguments via query string (Mercurial <1.9).
169 else:
169 else:
170 q += sorted(args.items())
170 q += sorted(args.items())
171
171
172 qs = '?%s' % urlreq.urlencode(q)
172 qs = '?%s' % urlreq.urlencode(q)
173 cu = "%s%s" % (repobaseurl, qs)
173 cu = "%s%s" % (repobaseurl, qs)
174 size = 0
174 size = 0
175 if util.safehasattr(data, 'length'):
175 if util.safehasattr(data, 'length'):
176 size = data.length
176 size = data.length
177 elif data is not None:
177 elif data is not None:
178 size = len(data)
178 size = len(data)
179 if data is not None and r'Content-Type' not in headers:
179 if data is not None and r'Content-Type' not in headers:
180 headers[r'Content-Type'] = r'application/mercurial-0.1'
180 headers[r'Content-Type'] = r'application/mercurial-0.1'
181
181
182 # Tell the server we accept application/mercurial-0.2 and multiple
182 # Tell the server we accept application/mercurial-0.2 and multiple
183 # compression formats if the server is capable of emitting those
183 # compression formats if the server is capable of emitting those
184 # payloads.
184 # payloads.
185 # Note: Keep this set empty by default, as client advertisement of
185 # Note: Keep this set empty by default, as client advertisement of
186 # protocol parameters should only occur after the handshake.
186 # protocol parameters should only occur after the handshake.
187 protoparams = set()
187 protoparams = set()
188
188
189 mediatypes = set()
189 mediatypes = set()
190 if caps is not None:
190 if caps is not None:
191 mt = capablefn('httpmediatype')
191 mt = capablefn('httpmediatype')
192 if mt:
192 if mt:
193 protoparams.add('0.1')
193 protoparams.add('0.1')
194 mediatypes = set(mt.split(','))
194 mediatypes = set(mt.split(','))
195
195
196 protoparams.add('partial-pull')
196 protoparams.add('partial-pull')
197
197
198 if '0.2tx' in mediatypes:
198 if '0.2tx' in mediatypes:
199 protoparams.add('0.2')
199 protoparams.add('0.2')
200
200
201 if '0.2tx' in mediatypes and capablefn('compression'):
201 if '0.2tx' in mediatypes and capablefn('compression'):
202 # We /could/ compare supported compression formats and prune
202 # We /could/ compare supported compression formats and prune
203 # non-mutually supported or error if nothing is mutually supported.
203 # non-mutually supported or error if nothing is mutually supported.
204 # For now, send the full list to the server and have it error.
204 # For now, send the full list to the server and have it error.
205 comps = [e.wireprotosupport().name for e in
205 comps = [e.wireprotosupport().name for e in
206 util.compengines.supportedwireengines(util.CLIENTROLE)]
206 util.compengines.supportedwireengines(util.CLIENTROLE)]
207 protoparams.add('comp=%s' % ','.join(comps))
207 protoparams.add('comp=%s' % ','.join(comps))
208
208
209 if protoparams:
209 if protoparams:
210 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
210 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
211 'X-HgProto',
211 'X-HgProto',
212 headersize or 1024)
212 headersize or 1024)
213 for header, value in protoheaders:
213 for header, value in protoheaders:
214 headers[header] = value
214 headers[header] = value
215
215
216 varyheaders = []
216 varyheaders = []
217 for header in headers:
217 for header in headers:
218 if header.lower().startswith(r'x-hg'):
218 if header.lower().startswith(r'x-hg'):
219 varyheaders.append(header)
219 varyheaders.append(header)
220
220
221 if varyheaders:
221 if varyheaders:
222 headers[r'Vary'] = r','.join(sorted(varyheaders))
222 headers[r'Vary'] = r','.join(sorted(varyheaders))
223
223
224 req = requestbuilder(pycompat.strurl(cu), data, headers)
224 req = requestbuilder(pycompat.strurl(cu), data, headers)
225
225
226 if data is not None:
226 if data is not None:
227 ui.debug("sending %d bytes\n" % size)
227 ui.debug("sending %d bytes\n" % size)
228 req.add_unredirected_header(r'Content-Length', r'%d' % size)
228 req.add_unredirected_header(r'Content-Length', r'%d' % size)
229
229
230 return req, cu, qs
230 return req, cu, qs
231
231
232 def _reqdata(req):
232 def _reqdata(req):
233 """Get request data, if any. If no data, returns None."""
233 """Get request data, if any. If no data, returns None."""
234 if pycompat.ispy3:
234 if pycompat.ispy3:
235 return req.data
235 return req.data
236 if not req.has_data():
236 if not req.has_data():
237 return None
237 return None
238 return req.get_data()
238 return req.get_data()
239
239
240 def sendrequest(ui, opener, req):
240 def sendrequest(ui, opener, req):
241 """Send a prepared HTTP request.
241 """Send a prepared HTTP request.
242
242
243 Returns the response object.
243 Returns the response object.
244 """
244 """
245 dbg = ui.debug
245 dbg = ui.debug
246 if (ui.debugflag
246 if (ui.debugflag
247 and ui.configbool('devel', 'debug.peer-request')):
247 and ui.configbool('devel', 'debug.peer-request')):
248 line = 'devel-peer-request: %s\n'
248 line = 'devel-peer-request: %s\n'
249 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
249 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
250 pycompat.bytesurl(req.get_full_url())))
250 pycompat.bytesurl(req.get_full_url())))
251 hgargssize = None
251 hgargssize = None
252
252
253 for header, value in sorted(req.header_items()):
253 for header, value in sorted(req.header_items()):
254 header = pycompat.bytesurl(header)
254 header = pycompat.bytesurl(header)
255 value = pycompat.bytesurl(value)
255 value = pycompat.bytesurl(value)
256 if header.startswith('X-hgarg-'):
256 if header.startswith('X-hgarg-'):
257 if hgargssize is None:
257 if hgargssize is None:
258 hgargssize = 0
258 hgargssize = 0
259 hgargssize += len(value)
259 hgargssize += len(value)
260 else:
260 else:
261 dbg(line % ' %s %s' % (header, value))
261 dbg(line % ' %s %s' % (header, value))
262
262
263 if hgargssize is not None:
263 if hgargssize is not None:
264 dbg(line % ' %d bytes of commands arguments in headers'
264 dbg(line % ' %d bytes of commands arguments in headers'
265 % hgargssize)
265 % hgargssize)
266 data = _reqdata(req)
266 data = _reqdata(req)
267 if data is not None:
267 if data is not None:
268 length = getattr(data, 'length', None)
268 length = getattr(data, 'length', None)
269 if length is None:
269 if length is None:
270 length = len(data)
270 length = len(data)
271 dbg(line % ' %d bytes of data' % length)
271 dbg(line % ' %d bytes of data' % length)
272
272
273 start = util.timer()
273 start = util.timer()
274
274
275 res = None
275 res = None
276 try:
276 try:
277 res = opener.open(req)
277 res = opener.open(req)
278 except urlerr.httperror as inst:
278 except urlerr.httperror as inst:
279 if inst.code == 401:
279 if inst.code == 401:
280 raise error.Abort(_('authorization failed'))
280 raise error.Abort(_('authorization failed'))
281 raise
281 raise
282 except httplib.HTTPException as inst:
282 except httplib.HTTPException as inst:
283 ui.debug('http error requesting %s\n' %
283 ui.debug('http error requesting %s\n' %
284 util.hidepassword(req.get_full_url()))
284 util.hidepassword(req.get_full_url()))
285 ui.traceback()
285 ui.traceback()
286 raise IOError(None, inst)
286 raise IOError(None, inst)
287 finally:
287 finally:
288 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
288 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
289 code = res.code if res else -1
289 code = res.code if res else -1
290 dbg(line % ' finished in %.4f seconds (%d)'
290 dbg(line % ' finished in %.4f seconds (%d)'
291 % (util.timer() - start, code))
291 % (util.timer() - start, code))
292
292
293 # Insert error handlers for common I/O failures.
293 # Insert error handlers for common I/O failures.
294 urlmod.wrapresponse(res)
294 urlmod.wrapresponse(res)
295
295
296 return res
296 return res
297
297
298 class RedirectedRepoError(error.RepoError):
298 class RedirectedRepoError(error.RepoError):
299 def __init__(self, msg, respurl):
299 def __init__(self, msg, respurl):
300 super(RedirectedRepoError, self).__init__(msg)
300 super(RedirectedRepoError, self).__init__(msg)
301 self.respurl = respurl
301 self.respurl = respurl
302
302
303 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
303 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
304 allowcbor=False):
304 allowcbor=False):
305 # record the url we got redirected to
305 # record the url we got redirected to
306 redirected = False
306 redirected = False
307 respurl = pycompat.bytesurl(resp.geturl())
307 respurl = pycompat.bytesurl(resp.geturl())
308 if respurl.endswith(qs):
308 if respurl.endswith(qs):
309 respurl = respurl[:-len(qs)]
309 respurl = respurl[:-len(qs)]
310 qsdropped = False
310 qsdropped = False
311 else:
311 else:
312 qsdropped = True
312 qsdropped = True
313
313
314 if baseurl.rstrip('/') != respurl.rstrip('/'):
314 if baseurl.rstrip('/') != respurl.rstrip('/'):
315 redirected = True
315 redirected = True
316 if not ui.quiet:
316 if not ui.quiet:
317 ui.warn(_('real URL is %s\n') % respurl)
317 ui.warn(_('real URL is %s\n') % respurl)
318
318
319 try:
319 try:
320 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
320 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
321 except AttributeError:
321 except AttributeError:
322 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
322 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
323
323
324 safeurl = util.hidepassword(baseurl)
324 safeurl = util.hidepassword(baseurl)
325 if proto.startswith('application/hg-error'):
325 if proto.startswith('application/hg-error'):
326 raise error.OutOfBandError(resp.read())
326 raise error.OutOfBandError(resp.read())
327
327
328 # Pre 1.0 versions of Mercurial used text/plain and
328 # Pre 1.0 versions of Mercurial used text/plain and
329 # application/hg-changegroup. We don't support such old servers.
329 # application/hg-changegroup. We don't support such old servers.
330 if not proto.startswith('application/mercurial-'):
330 if not proto.startswith('application/mercurial-'):
331 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
331 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
332 msg = _("'%s' does not appear to be an hg repository:\n"
332 msg = _("'%s' does not appear to be an hg repository:\n"
333 "---%%<--- (%s)\n%s\n---%%<---\n") % (
333 "---%%<--- (%s)\n%s\n---%%<---\n") % (
334 safeurl, proto or 'no content-type', resp.read(1024))
334 safeurl, proto or 'no content-type', resp.read(1024))
335
335
336 # Some servers may strip the query string from the redirect. We
336 # Some servers may strip the query string from the redirect. We
337 # raise a special error type so callers can react to this specially.
337 # raise a special error type so callers can react to this specially.
338 if redirected and qsdropped:
338 if redirected and qsdropped:
339 raise RedirectedRepoError(msg, respurl)
339 raise RedirectedRepoError(msg, respurl)
340 else:
340 else:
341 raise error.RepoError(msg)
341 raise error.RepoError(msg)
342
342
343 try:
343 try:
344 subtype = proto.split('-', 1)[1]
344 subtype = proto.split('-', 1)[1]
345
345
346 # Unless we end up supporting CBOR in the legacy wire protocol,
346 # Unless we end up supporting CBOR in the legacy wire protocol,
347 # this should ONLY be encountered for the initial capabilities
347 # this should ONLY be encountered for the initial capabilities
348 # request during handshake.
348 # request during handshake.
349 if subtype == 'cbor':
349 if subtype == 'cbor':
350 if allowcbor:
350 if allowcbor:
351 return respurl, proto, resp
351 return respurl, proto, resp
352 else:
352 else:
353 raise error.RepoError(_('unexpected CBOR response from '
353 raise error.RepoError(_('unexpected CBOR response from '
354 'server'))
354 'server'))
355
355
356 version_info = tuple([int(n) for n in subtype.split('.')])
356 version_info = tuple([int(n) for n in subtype.split('.')])
357 except ValueError:
357 except ValueError:
358 raise error.RepoError(_("'%s' sent a broken Content-Type "
358 raise error.RepoError(_("'%s' sent a broken Content-Type "
359 "header (%s)") % (safeurl, proto))
359 "header (%s)") % (safeurl, proto))
360
360
361 # TODO consider switching to a decompression reader that uses
361 # TODO consider switching to a decompression reader that uses
362 # generators.
362 # generators.
363 if version_info == (0, 1):
363 if version_info == (0, 1):
364 if compressible:
364 if compressible:
365 resp = util.compengines['zlib'].decompressorreader(resp)
365 resp = util.compengines['zlib'].decompressorreader(resp)
366
366
367 elif version_info == (0, 2):
367 elif version_info == (0, 2):
368 # application/mercurial-0.2 always identifies the compression
368 # application/mercurial-0.2 always identifies the compression
369 # engine in the payload header.
369 # engine in the payload header.
370 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
370 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
371 ename = util.readexactly(resp, elen)
371 ename = util.readexactly(resp, elen)
372 engine = util.compengines.forwiretype(ename)
372 engine = util.compengines.forwiretype(ename)
373
373
374 resp = engine.decompressorreader(resp)
374 resp = engine.decompressorreader(resp)
375 else:
375 else:
376 raise error.RepoError(_("'%s' uses newer protocol %s") %
376 raise error.RepoError(_("'%s' uses newer protocol %s") %
377 (safeurl, subtype))
377 (safeurl, subtype))
378
378
379 return respurl, proto, resp
379 return respurl, proto, resp
380
380
381 class httppeer(wireprotov1peer.wirepeer):
381 class httppeer(wireprotov1peer.wirepeer):
382 def __init__(self, ui, path, url, opener, requestbuilder, caps):
382 def __init__(self, ui, path, url, opener, requestbuilder, caps):
383 self.ui = ui
383 self.ui = ui
384 self._path = path
384 self._path = path
385 self._url = url
385 self._url = url
386 self._caps = caps
386 self._caps = caps
387 self.limitedarguments = caps is not None and 'httppostargs' not in caps
387 self.limitedarguments = caps is not None and 'httppostargs' not in caps
388 self._urlopener = opener
388 self._urlopener = opener
389 self._requestbuilder = requestbuilder
389 self._requestbuilder = requestbuilder
390
390
391 def __del__(self):
391 def __del__(self):
392 for h in self._urlopener.handlers:
392 for h in self._urlopener.handlers:
393 h.close()
393 h.close()
394 getattr(h, "close_all", lambda: None)()
394 getattr(h, "close_all", lambda: None)()
395
395
396 # Begin of ipeerconnection interface.
396 # Begin of ipeerconnection interface.
397
397
398 def url(self):
398 def url(self):
399 return self._path
399 return self._path
400
400
401 def local(self):
401 def local(self):
402 return None
402 return None
403
403
404 def peer(self):
404 def peer(self):
405 return self
405 return self
406
406
407 def canpush(self):
407 def canpush(self):
408 return True
408 return True
409
409
410 def close(self):
410 def close(self):
411 try:
411 try:
412 reqs, sent, recv = (self._urlopener.requestscount,
412 reqs, sent, recv = (self._urlopener.requestscount,
413 self._urlopener.sentbytescount,
413 self._urlopener.sentbytescount,
414 self._urlopener.receivedbytescount)
414 self._urlopener.receivedbytescount)
415 except AttributeError:
415 except AttributeError:
416 return
416 return
417 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
417 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
418 'received %d bytes in responses)\n') %
418 'received %d bytes in responses)\n') %
419 (reqs, sent, recv))
419 (reqs, sent, recv))
420
420
421 # End of ipeerconnection interface.
421 # End of ipeerconnection interface.
422
422
423 # Begin of ipeercommands interface.
423 # Begin of ipeercommands interface.
424
424
425 def capabilities(self):
425 def capabilities(self):
426 return self._caps
426 return self._caps
427
427
428 # End of ipeercommands interface.
428 # End of ipeercommands interface.
429
429
430 def _callstream(self, cmd, _compressible=False, **args):
430 def _callstream(self, cmd, _compressible=False, **args):
431 args = pycompat.byteskwargs(args)
431 args = pycompat.byteskwargs(args)
432
432
433 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
433 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
434 self._caps, self.capable,
434 self._caps, self.capable,
435 self._url, cmd, args)
435 self._url, cmd, args)
436
436
437 resp = sendrequest(self.ui, self._urlopener, req)
437 resp = sendrequest(self.ui, self._urlopener, req)
438
438
439 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
439 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
440 resp, _compressible)
440 resp, _compressible)
441
441
442 return resp
442 return resp
443
443
444 def _call(self, cmd, **args):
444 def _call(self, cmd, **args):
445 fp = self._callstream(cmd, **args)
445 fp = self._callstream(cmd, **args)
446 try:
446 try:
447 return fp.read()
447 return fp.read()
448 finally:
448 finally:
449 # if using keepalive, allow connection to be reused
449 # if using keepalive, allow connection to be reused
450 fp.close()
450 fp.close()
451
451
452 def _callpush(self, cmd, cg, **args):
452 def _callpush(self, cmd, cg, **args):
453 # have to stream bundle to a temp file because we do not have
453 # have to stream bundle to a temp file because we do not have
454 # http 1.1 chunked transfer.
454 # http 1.1 chunked transfer.
455
455
456 types = self.capable('unbundle')
456 types = self.capable('unbundle')
457 try:
457 try:
458 types = types.split(',')
458 types = types.split(',')
459 except AttributeError:
459 except AttributeError:
460 # servers older than d1b16a746db6 will send 'unbundle' as a
460 # servers older than d1b16a746db6 will send 'unbundle' as a
461 # boolean capability. They only support headerless/uncompressed
461 # boolean capability. They only support headerless/uncompressed
462 # bundles.
462 # bundles.
463 types = [""]
463 types = [""]
464 for x in types:
464 for x in types:
465 if x in bundle2.bundletypes:
465 if x in bundle2.bundletypes:
466 type = x
466 type = x
467 break
467 break
468
468
469 tempname = bundle2.writebundle(self.ui, cg, None, type)
469 tempname = bundle2.writebundle(self.ui, cg, None, type)
470 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
470 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
471 headers = {r'Content-Type': r'application/mercurial-0.1'}
471 headers = {r'Content-Type': r'application/mercurial-0.1'}
472
472
473 try:
473 try:
474 r = self._call(cmd, data=fp, headers=headers, **args)
474 r = self._call(cmd, data=fp, headers=headers, **args)
475 vals = r.split('\n', 1)
475 vals = r.split('\n', 1)
476 if len(vals) < 2:
476 if len(vals) < 2:
477 raise error.ResponseError(_("unexpected response:"), r)
477 raise error.ResponseError(_("unexpected response:"), r)
478 return vals
478 return vals
479 except urlerr.httperror:
479 except urlerr.httperror:
480 # Catch and re-raise these so we don't try and treat them
480 # Catch and re-raise these so we don't try and treat them
481 # like generic socket errors. They lack any values in
481 # like generic socket errors. They lack any values in
482 # .args on Python 3 which breaks our socket.error block.
482 # .args on Python 3 which breaks our socket.error block.
483 raise
483 raise
484 except socket.error as err:
484 except socket.error as err:
485 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
485 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
486 raise error.Abort(_('push failed: %s') % err.args[1])
486 raise error.Abort(_('push failed: %s') % err.args[1])
487 raise error.Abort(err.args[1])
487 raise error.Abort(err.args[1])
488 finally:
488 finally:
489 fp.close()
489 fp.close()
490 os.unlink(tempname)
490 os.unlink(tempname)
491
491
492 def _calltwowaystream(self, cmd, fp, **args):
492 def _calltwowaystream(self, cmd, fp, **args):
493 fh = None
494 fp_ = None
493 fp_ = None
495 filename = None
494 filename = None
496 try:
495 try:
497 # dump bundle to disk
496 # dump bundle to disk
498 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
497 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
499 fh = os.fdopen(fd, r"wb")
498 with os.fdopen(fd, r"wb") as fh:
500 d = fp.read(4096)
499 d = fp.read(4096)
501 while d:
500 while d:
502 fh.write(d)
501 fh.write(d)
503 d = fp.read(4096)
502 d = fp.read(4096)
504 fh.close()
505 # start http push
503 # start http push
506 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
504 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
507 headers = {r'Content-Type': r'application/mercurial-0.1'}
505 headers = {r'Content-Type': r'application/mercurial-0.1'}
508 return self._callstream(cmd, data=fp_, headers=headers, **args)
506 return self._callstream(cmd, data=fp_, headers=headers, **args)
509 finally:
507 finally:
510 if fp_ is not None:
508 if fp_ is not None:
511 fp_.close()
509 fp_.close()
512 if fh is not None:
510 if filename is not None:
513 fh.close()
514 os.unlink(filename)
511 os.unlink(filename)
515
512
516 def _callcompressable(self, cmd, **args):
513 def _callcompressable(self, cmd, **args):
517 return self._callstream(cmd, _compressible=True, **args)
514 return self._callstream(cmd, _compressible=True, **args)
518
515
519 def _abort(self, exception):
516 def _abort(self, exception):
520 raise exception
517 raise exception
521
518
522 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests,
519 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests,
523 redirect):
520 redirect):
524 wireprotoframing.populatestreamencoders()
521 wireprotoframing.populatestreamencoders()
525
522
526 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
523 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
527
524
528 if uiencoders:
525 if uiencoders:
529 encoders = []
526 encoders = []
530
527
531 for encoder in uiencoders:
528 for encoder in uiencoders:
532 if encoder not in wireprotoframing.STREAM_ENCODERS:
529 if encoder not in wireprotoframing.STREAM_ENCODERS:
533 ui.warn(_(b'wire protocol version 2 encoder referenced in '
530 ui.warn(_(b'wire protocol version 2 encoder referenced in '
534 b'config (%s) is not known; ignoring\n') % encoder)
531 b'config (%s) is not known; ignoring\n') % encoder)
535 else:
532 else:
536 encoders.append(encoder)
533 encoders.append(encoder)
537
534
538 else:
535 else:
539 encoders = wireprotoframing.STREAM_ENCODERS_ORDER
536 encoders = wireprotoframing.STREAM_ENCODERS_ORDER
540
537
541 reactor = wireprotoframing.clientreactor(ui,
538 reactor = wireprotoframing.clientreactor(ui,
542 hasmultiplesend=False,
539 hasmultiplesend=False,
543 buffersends=True,
540 buffersends=True,
544 clientcontentencoders=encoders)
541 clientcontentencoders=encoders)
545
542
546 handler = wireprotov2peer.clienthandler(ui, reactor,
543 handler = wireprotov2peer.clienthandler(ui, reactor,
547 opener=opener,
544 opener=opener,
548 requestbuilder=requestbuilder)
545 requestbuilder=requestbuilder)
549
546
550 url = '%s/%s' % (apiurl, permission)
547 url = '%s/%s' % (apiurl, permission)
551
548
552 if len(requests) > 1:
549 if len(requests) > 1:
553 url += '/multirequest'
550 url += '/multirequest'
554 else:
551 else:
555 url += '/%s' % requests[0][0]
552 url += '/%s' % requests[0][0]
556
553
557 ui.debug('sending %d commands\n' % len(requests))
554 ui.debug('sending %d commands\n' % len(requests))
558 for command, args, f in requests:
555 for command, args, f in requests:
559 ui.debug('sending command %s: %s\n' % (
556 ui.debug('sending command %s: %s\n' % (
560 command, stringutil.pprint(args, indent=2)))
557 command, stringutil.pprint(args, indent=2)))
561 assert not list(handler.callcommand(command, args, f,
558 assert not list(handler.callcommand(command, args, f,
562 redirect=redirect))
559 redirect=redirect))
563
560
564 # TODO stream this.
561 # TODO stream this.
565 body = b''.join(map(bytes, handler.flushcommands()))
562 body = b''.join(map(bytes, handler.flushcommands()))
566
563
567 # TODO modify user-agent to reflect v2
564 # TODO modify user-agent to reflect v2
568 headers = {
565 headers = {
569 r'Accept': wireprotov2server.FRAMINGTYPE,
566 r'Accept': wireprotov2server.FRAMINGTYPE,
570 r'Content-Type': wireprotov2server.FRAMINGTYPE,
567 r'Content-Type': wireprotov2server.FRAMINGTYPE,
571 }
568 }
572
569
573 req = requestbuilder(pycompat.strurl(url), body, headers)
570 req = requestbuilder(pycompat.strurl(url), body, headers)
574 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
571 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
575
572
576 try:
573 try:
577 res = opener.open(req)
574 res = opener.open(req)
578 except urlerr.httperror as e:
575 except urlerr.httperror as e:
579 if e.code == 401:
576 if e.code == 401:
580 raise error.Abort(_('authorization failed'))
577 raise error.Abort(_('authorization failed'))
581
578
582 raise
579 raise
583 except httplib.HTTPException as e:
580 except httplib.HTTPException as e:
584 ui.traceback()
581 ui.traceback()
585 raise IOError(None, e)
582 raise IOError(None, e)
586
583
587 return handler, res
584 return handler, res
588
585
589 class queuedcommandfuture(pycompat.futures.Future):
586 class queuedcommandfuture(pycompat.futures.Future):
590 """Wraps result() on command futures to trigger submission on call."""
587 """Wraps result() on command futures to trigger submission on call."""
591
588
592 def result(self, timeout=None):
589 def result(self, timeout=None):
593 if self.done():
590 if self.done():
594 return pycompat.futures.Future.result(self, timeout)
591 return pycompat.futures.Future.result(self, timeout)
595
592
596 self._peerexecutor.sendcommands()
593 self._peerexecutor.sendcommands()
597
594
598 # sendcommands() will restore the original __class__ and self.result
595 # sendcommands() will restore the original __class__ and self.result
599 # will resolve to Future.result.
596 # will resolve to Future.result.
600 return self.result(timeout)
597 return self.result(timeout)
601
598
602 @interfaceutil.implementer(repository.ipeercommandexecutor)
599 @interfaceutil.implementer(repository.ipeercommandexecutor)
603 class httpv2executor(object):
600 class httpv2executor(object):
604 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor,
601 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor,
605 redirect):
602 redirect):
606 self._ui = ui
603 self._ui = ui
607 self._opener = opener
604 self._opener = opener
608 self._requestbuilder = requestbuilder
605 self._requestbuilder = requestbuilder
609 self._apiurl = apiurl
606 self._apiurl = apiurl
610 self._descriptor = descriptor
607 self._descriptor = descriptor
611 self._redirect = redirect
608 self._redirect = redirect
612 self._sent = False
609 self._sent = False
613 self._closed = False
610 self._closed = False
614 self._neededpermissions = set()
611 self._neededpermissions = set()
615 self._calls = []
612 self._calls = []
616 self._futures = weakref.WeakSet()
613 self._futures = weakref.WeakSet()
617 self._responseexecutor = None
614 self._responseexecutor = None
618 self._responsef = None
615 self._responsef = None
619
616
620 def __enter__(self):
617 def __enter__(self):
621 return self
618 return self
622
619
623 def __exit__(self, exctype, excvalue, exctb):
620 def __exit__(self, exctype, excvalue, exctb):
624 self.close()
621 self.close()
625
622
626 def callcommand(self, command, args):
623 def callcommand(self, command, args):
627 if self._sent:
624 if self._sent:
628 raise error.ProgrammingError('callcommand() cannot be used after '
625 raise error.ProgrammingError('callcommand() cannot be used after '
629 'commands are sent')
626 'commands are sent')
630
627
631 if self._closed:
628 if self._closed:
632 raise error.ProgrammingError('callcommand() cannot be used after '
629 raise error.ProgrammingError('callcommand() cannot be used after '
633 'close()')
630 'close()')
634
631
635 # The service advertises which commands are available. So if we attempt
632 # The service advertises which commands are available. So if we attempt
636 # to call an unknown command or pass an unknown argument, we can screen
633 # to call an unknown command or pass an unknown argument, we can screen
637 # for this.
634 # for this.
638 if command not in self._descriptor['commands']:
635 if command not in self._descriptor['commands']:
639 raise error.ProgrammingError(
636 raise error.ProgrammingError(
640 'wire protocol command %s is not available' % command)
637 'wire protocol command %s is not available' % command)
641
638
642 cmdinfo = self._descriptor['commands'][command]
639 cmdinfo = self._descriptor['commands'][command]
643 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
640 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
644
641
645 if unknownargs:
642 if unknownargs:
646 raise error.ProgrammingError(
643 raise error.ProgrammingError(
647 'wire protocol command %s does not accept argument: %s' % (
644 'wire protocol command %s does not accept argument: %s' % (
648 command, ', '.join(sorted(unknownargs))))
645 command, ', '.join(sorted(unknownargs))))
649
646
650 self._neededpermissions |= set(cmdinfo['permissions'])
647 self._neededpermissions |= set(cmdinfo['permissions'])
651
648
652 # TODO we /could/ also validate types here, since the API descriptor
649 # TODO we /could/ also validate types here, since the API descriptor
653 # includes types...
650 # includes types...
654
651
655 f = pycompat.futures.Future()
652 f = pycompat.futures.Future()
656
653
657 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
654 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
658 # could deadlock.
655 # could deadlock.
659 f.__class__ = queuedcommandfuture
656 f.__class__ = queuedcommandfuture
660 f._peerexecutor = self
657 f._peerexecutor = self
661
658
662 self._futures.add(f)
659 self._futures.add(f)
663 self._calls.append((command, args, f))
660 self._calls.append((command, args, f))
664
661
665 return f
662 return f
666
663
667 def sendcommands(self):
664 def sendcommands(self):
668 if self._sent:
665 if self._sent:
669 return
666 return
670
667
671 if not self._calls:
668 if not self._calls:
672 return
669 return
673
670
674 self._sent = True
671 self._sent = True
675
672
676 # Unhack any future types so caller sees a clean type and so we
673 # Unhack any future types so caller sees a clean type and so we
677 # break reference cycle.
674 # break reference cycle.
678 for f in self._futures:
675 for f in self._futures:
679 if isinstance(f, queuedcommandfuture):
676 if isinstance(f, queuedcommandfuture):
680 f.__class__ = pycompat.futures.Future
677 f.__class__ = pycompat.futures.Future
681 f._peerexecutor = None
678 f._peerexecutor = None
682
679
683 # Mark the future as running and filter out cancelled futures.
680 # Mark the future as running and filter out cancelled futures.
684 calls = [(command, args, f)
681 calls = [(command, args, f)
685 for command, args, f in self._calls
682 for command, args, f in self._calls
686 if f.set_running_or_notify_cancel()]
683 if f.set_running_or_notify_cancel()]
687
684
688 # Clear out references, prevent improper object usage.
685 # Clear out references, prevent improper object usage.
689 self._calls = None
686 self._calls = None
690
687
691 if not calls:
688 if not calls:
692 return
689 return
693
690
694 permissions = set(self._neededpermissions)
691 permissions = set(self._neededpermissions)
695
692
696 if 'push' in permissions and 'pull' in permissions:
693 if 'push' in permissions and 'pull' in permissions:
697 permissions.remove('pull')
694 permissions.remove('pull')
698
695
699 if len(permissions) > 1:
696 if len(permissions) > 1:
700 raise error.RepoError(_('cannot make request requiring multiple '
697 raise error.RepoError(_('cannot make request requiring multiple '
701 'permissions: %s') %
698 'permissions: %s') %
702 _(', ').join(sorted(permissions)))
699 _(', ').join(sorted(permissions)))
703
700
704 permission = {
701 permission = {
705 'push': 'rw',
702 'push': 'rw',
706 'pull': 'ro',
703 'pull': 'ro',
707 }[permissions.pop()]
704 }[permissions.pop()]
708
705
709 handler, resp = sendv2request(
706 handler, resp = sendv2request(
710 self._ui, self._opener, self._requestbuilder, self._apiurl,
707 self._ui, self._opener, self._requestbuilder, self._apiurl,
711 permission, calls, self._redirect)
708 permission, calls, self._redirect)
712
709
713 # TODO we probably want to validate the HTTP code, media type, etc.
710 # TODO we probably want to validate the HTTP code, media type, etc.
714
711
715 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
712 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
716 self._responsef = self._responseexecutor.submit(self._handleresponse,
713 self._responsef = self._responseexecutor.submit(self._handleresponse,
717 handler, resp)
714 handler, resp)
718
715
719 def close(self):
716 def close(self):
720 if self._closed:
717 if self._closed:
721 return
718 return
722
719
723 self.sendcommands()
720 self.sendcommands()
724
721
725 self._closed = True
722 self._closed = True
726
723
727 if not self._responsef:
724 if not self._responsef:
728 return
725 return
729
726
730 # TODO ^C here may not result in immediate program termination.
727 # TODO ^C here may not result in immediate program termination.
731
728
732 try:
729 try:
733 self._responsef.result()
730 self._responsef.result()
734 finally:
731 finally:
735 self._responseexecutor.shutdown(wait=True)
732 self._responseexecutor.shutdown(wait=True)
736 self._responsef = None
733 self._responsef = None
737 self._responseexecutor = None
734 self._responseexecutor = None
738
735
739 # If any of our futures are still in progress, mark them as
736 # If any of our futures are still in progress, mark them as
740 # errored, otherwise a result() could wait indefinitely.
737 # errored, otherwise a result() could wait indefinitely.
741 for f in self._futures:
738 for f in self._futures:
742 if not f.done():
739 if not f.done():
743 f.set_exception(error.ResponseError(
740 f.set_exception(error.ResponseError(
744 _('unfulfilled command response')))
741 _('unfulfilled command response')))
745
742
746 self._futures = None
743 self._futures = None
747
744
748 def _handleresponse(self, handler, resp):
745 def _handleresponse(self, handler, resp):
749 # Called in a thread to read the response.
746 # Called in a thread to read the response.
750
747
751 while handler.readdata(resp):
748 while handler.readdata(resp):
752 pass
749 pass
753
750
754 @interfaceutil.implementer(repository.ipeerv2)
751 @interfaceutil.implementer(repository.ipeerv2)
755 class httpv2peer(object):
752 class httpv2peer(object):
756
753
757 limitedarguments = False
754 limitedarguments = False
758
755
759 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
756 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
760 apidescriptor):
757 apidescriptor):
761 self.ui = ui
758 self.ui = ui
762 self.apidescriptor = apidescriptor
759 self.apidescriptor = apidescriptor
763
760
764 if repourl.endswith('/'):
761 if repourl.endswith('/'):
765 repourl = repourl[:-1]
762 repourl = repourl[:-1]
766
763
767 self._url = repourl
764 self._url = repourl
768 self._apipath = apipath
765 self._apipath = apipath
769 self._apiurl = '%s/%s' % (repourl, apipath)
766 self._apiurl = '%s/%s' % (repourl, apipath)
770 self._opener = opener
767 self._opener = opener
771 self._requestbuilder = requestbuilder
768 self._requestbuilder = requestbuilder
772
769
773 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
770 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
774
771
775 # Start of ipeerconnection.
772 # Start of ipeerconnection.
776
773
777 def url(self):
774 def url(self):
778 return self._url
775 return self._url
779
776
780 def local(self):
777 def local(self):
781 return None
778 return None
782
779
783 def peer(self):
780 def peer(self):
784 return self
781 return self
785
782
786 def canpush(self):
783 def canpush(self):
787 # TODO change once implemented.
784 # TODO change once implemented.
788 return False
785 return False
789
786
790 def close(self):
787 def close(self):
791 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
788 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
792 'received %d bytes in responses)\n') %
789 'received %d bytes in responses)\n') %
793 (self._opener.requestscount,
790 (self._opener.requestscount,
794 self._opener.sentbytescount,
791 self._opener.sentbytescount,
795 self._opener.receivedbytescount))
792 self._opener.receivedbytescount))
796
793
797 # End of ipeerconnection.
794 # End of ipeerconnection.
798
795
799 # Start of ipeercapabilities.
796 # Start of ipeercapabilities.
800
797
801 def capable(self, name):
798 def capable(self, name):
802 # The capabilities used internally historically map to capabilities
799 # The capabilities used internally historically map to capabilities
803 # advertised from the "capabilities" wire protocol command. However,
800 # advertised from the "capabilities" wire protocol command. However,
804 # version 2 of that command works differently.
801 # version 2 of that command works differently.
805
802
806 # Maps to commands that are available.
803 # Maps to commands that are available.
807 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
804 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
808 return True
805 return True
809
806
810 # Other concepts.
807 # Other concepts.
811 if name in ('bundle2'):
808 if name in ('bundle2'):
812 return True
809 return True
813
810
814 # Alias command-* to presence of command of that name.
811 # Alias command-* to presence of command of that name.
815 if name.startswith('command-'):
812 if name.startswith('command-'):
816 return name[len('command-'):] in self.apidescriptor['commands']
813 return name[len('command-'):] in self.apidescriptor['commands']
817
814
818 return False
815 return False
819
816
820 def requirecap(self, name, purpose):
817 def requirecap(self, name, purpose):
821 if self.capable(name):
818 if self.capable(name):
822 return
819 return
823
820
824 raise error.CapabilityError(
821 raise error.CapabilityError(
825 _('cannot %s; client or remote repository does not support the '
822 _('cannot %s; client or remote repository does not support the '
826 '\'%s\' capability') % (purpose, name))
823 '\'%s\' capability') % (purpose, name))
827
824
828 # End of ipeercapabilities.
825 # End of ipeercapabilities.
829
826
830 def _call(self, name, **args):
827 def _call(self, name, **args):
831 with self.commandexecutor() as e:
828 with self.commandexecutor() as e:
832 return e.callcommand(name, args).result()
829 return e.callcommand(name, args).result()
833
830
834 def commandexecutor(self):
831 def commandexecutor(self):
835 return httpv2executor(self.ui, self._opener, self._requestbuilder,
832 return httpv2executor(self.ui, self._opener, self._requestbuilder,
836 self._apiurl, self.apidescriptor, self._redirect)
833 self._apiurl, self.apidescriptor, self._redirect)
837
834
838 # Registry of API service names to metadata about peers that handle it.
835 # Registry of API service names to metadata about peers that handle it.
839 #
836 #
840 # The following keys are meaningful:
837 # The following keys are meaningful:
841 #
838 #
842 # init
839 # init
843 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
840 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
844 # apidescriptor) to create a peer.
841 # apidescriptor) to create a peer.
845 #
842 #
846 # priority
843 # priority
847 # Integer priority for the service. If we could choose from multiple
844 # Integer priority for the service. If we could choose from multiple
848 # services, we choose the one with the highest priority.
845 # services, we choose the one with the highest priority.
849 API_PEERS = {
846 API_PEERS = {
850 wireprototypes.HTTP_WIREPROTO_V2: {
847 wireprototypes.HTTP_WIREPROTO_V2: {
851 'init': httpv2peer,
848 'init': httpv2peer,
852 'priority': 50,
849 'priority': 50,
853 },
850 },
854 }
851 }
855
852
856 def performhandshake(ui, url, opener, requestbuilder):
853 def performhandshake(ui, url, opener, requestbuilder):
857 # The handshake is a request to the capabilities command.
854 # The handshake is a request to the capabilities command.
858
855
859 caps = None
856 caps = None
860 def capable(x):
857 def capable(x):
861 raise error.ProgrammingError('should not be called')
858 raise error.ProgrammingError('should not be called')
862
859
863 args = {}
860 args = {}
864
861
865 # The client advertises support for newer protocols by adding an
862 # The client advertises support for newer protocols by adding an
866 # X-HgUpgrade-* header with a list of supported APIs and an
863 # X-HgUpgrade-* header with a list of supported APIs and an
867 # X-HgProto-* header advertising which serializing formats it supports.
864 # X-HgProto-* header advertising which serializing formats it supports.
868 # We only support the HTTP version 2 transport and CBOR responses for
865 # We only support the HTTP version 2 transport and CBOR responses for
869 # now.
866 # now.
870 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
867 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
871
868
872 if advertisev2:
869 if advertisev2:
873 args['headers'] = {
870 args['headers'] = {
874 r'X-HgProto-1': r'cbor',
871 r'X-HgProto-1': r'cbor',
875 }
872 }
876
873
877 args['headers'].update(
874 args['headers'].update(
878 encodevalueinheaders(' '.join(sorted(API_PEERS)),
875 encodevalueinheaders(' '.join(sorted(API_PEERS)),
879 'X-HgUpgrade',
876 'X-HgUpgrade',
880 # We don't know the header limit this early.
877 # We don't know the header limit this early.
881 # So make it small.
878 # So make it small.
882 1024))
879 1024))
883
880
884 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
881 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
885 capable, url, 'capabilities',
882 capable, url, 'capabilities',
886 args)
883 args)
887 resp = sendrequest(ui, opener, req)
884 resp = sendrequest(ui, opener, req)
888
885
889 # The server may redirect us to the repo root, stripping the
886 # The server may redirect us to the repo root, stripping the
890 # ?cmd=capabilities query string from the URL. The server would likely
887 # ?cmd=capabilities query string from the URL. The server would likely
891 # return HTML in this case and ``parsev1commandresponse()`` would raise.
888 # return HTML in this case and ``parsev1commandresponse()`` would raise.
892 # We catch this special case and re-issue the capabilities request against
889 # We catch this special case and re-issue the capabilities request against
893 # the new URL.
890 # the new URL.
894 #
891 #
895 # We should ideally not do this, as a redirect that drops the query
892 # We should ideally not do this, as a redirect that drops the query
896 # string from the URL is arguably a server bug. (Garbage in, garbage out).
893 # string from the URL is arguably a server bug. (Garbage in, garbage out).
897 # However, Mercurial clients for several years appeared to handle this
894 # However, Mercurial clients for several years appeared to handle this
898 # issue without behavior degradation. And according to issue 5860, it may
895 # issue without behavior degradation. And according to issue 5860, it may
899 # be a longstanding bug in some server implementations. So we allow a
896 # be a longstanding bug in some server implementations. So we allow a
900 # redirect that drops the query string to "just work."
897 # redirect that drops the query string to "just work."
901 try:
898 try:
902 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
903 compressible=False,
900 compressible=False,
904 allowcbor=advertisev2)
901 allowcbor=advertisev2)
905 except RedirectedRepoError as e:
902 except RedirectedRepoError as e:
906 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
903 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
907 capable, e.respurl,
904 capable, e.respurl,
908 'capabilities', args)
905 'capabilities', args)
909 resp = sendrequest(ui, opener, req)
906 resp = sendrequest(ui, opener, req)
910 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
907 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
911 compressible=False,
908 compressible=False,
912 allowcbor=advertisev2)
909 allowcbor=advertisev2)
913
910
914 try:
911 try:
915 rawdata = resp.read()
912 rawdata = resp.read()
916 finally:
913 finally:
917 resp.close()
914 resp.close()
918
915
919 if not ct.startswith('application/mercurial-'):
916 if not ct.startswith('application/mercurial-'):
920 raise error.ProgrammingError('unexpected content-type: %s' % ct)
917 raise error.ProgrammingError('unexpected content-type: %s' % ct)
921
918
922 if advertisev2:
919 if advertisev2:
923 if ct == 'application/mercurial-cbor':
920 if ct == 'application/mercurial-cbor':
924 try:
921 try:
925 info = cborutil.decodeall(rawdata)[0]
922 info = cborutil.decodeall(rawdata)[0]
926 except cborutil.CBORDecodeError:
923 except cborutil.CBORDecodeError:
927 raise error.Abort(_('error decoding CBOR from remote server'),
924 raise error.Abort(_('error decoding CBOR from remote server'),
928 hint=_('try again and consider contacting '
925 hint=_('try again and consider contacting '
929 'the server operator'))
926 'the server operator'))
930
927
931 # We got a legacy response. That's fine.
928 # We got a legacy response. That's fine.
932 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
929 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
933 info = {
930 info = {
934 'v1capabilities': set(rawdata.split())
931 'v1capabilities': set(rawdata.split())
935 }
932 }
936
933
937 else:
934 else:
938 raise error.RepoError(
935 raise error.RepoError(
939 _('unexpected response type from server: %s') % ct)
936 _('unexpected response type from server: %s') % ct)
940 else:
937 else:
941 info = {
938 info = {
942 'v1capabilities': set(rawdata.split())
939 'v1capabilities': set(rawdata.split())
943 }
940 }
944
941
945 return respurl, info
942 return respurl, info
946
943
947 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
944 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
948 """Construct an appropriate HTTP peer instance.
945 """Construct an appropriate HTTP peer instance.
949
946
950 ``opener`` is an ``url.opener`` that should be used to establish
947 ``opener`` is an ``url.opener`` that should be used to establish
951 connections, perform HTTP requests.
948 connections, perform HTTP requests.
952
949
953 ``requestbuilder`` is the type used for constructing HTTP requests.
950 ``requestbuilder`` is the type used for constructing HTTP requests.
954 It exists as an argument so extensions can override the default.
951 It exists as an argument so extensions can override the default.
955 """
952 """
956 u = util.url(path)
953 u = util.url(path)
957 if u.query or u.fragment:
954 if u.query or u.fragment:
958 raise error.Abort(_('unsupported URL component: "%s"') %
955 raise error.Abort(_('unsupported URL component: "%s"') %
959 (u.query or u.fragment))
956 (u.query or u.fragment))
960
957
961 # urllib cannot handle URLs with embedded user or passwd.
958 # urllib cannot handle URLs with embedded user or passwd.
962 url, authinfo = u.authinfo()
959 url, authinfo = u.authinfo()
963 ui.debug('using %s\n' % url)
960 ui.debug('using %s\n' % url)
964
961
965 opener = opener or urlmod.opener(ui, authinfo)
962 opener = opener or urlmod.opener(ui, authinfo)
966
963
967 respurl, info = performhandshake(ui, url, opener, requestbuilder)
964 respurl, info = performhandshake(ui, url, opener, requestbuilder)
968
965
969 # Given the intersection of APIs that both we and the server support,
966 # Given the intersection of APIs that both we and the server support,
970 # sort by their advertised priority and pick the first one.
967 # sort by their advertised priority and pick the first one.
971 #
968 #
972 # TODO consider making this request-based and interface driven. For
969 # TODO consider making this request-based and interface driven. For
973 # example, the caller could say "I want a peer that does X." It's quite
970 # example, the caller could say "I want a peer that does X." It's quite
974 # possible that not all peers would do that. Since we know the service
971 # possible that not all peers would do that. Since we know the service
975 # capabilities, we could filter out services not meeting the
972 # capabilities, we could filter out services not meeting the
976 # requirements. Possibly by consulting the interfaces defined by the
973 # requirements. Possibly by consulting the interfaces defined by the
977 # peer type.
974 # peer type.
978 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
975 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
979
976
980 preferredchoices = sorted(apipeerchoices,
977 preferredchoices = sorted(apipeerchoices,
981 key=lambda x: API_PEERS[x]['priority'],
978 key=lambda x: API_PEERS[x]['priority'],
982 reverse=True)
979 reverse=True)
983
980
984 for service in preferredchoices:
981 for service in preferredchoices:
985 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
982 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
986
983
987 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
984 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
988 requestbuilder,
985 requestbuilder,
989 info['apis'][service])
986 info['apis'][service])
990
987
991 # Failed to construct an API peer. Fall back to legacy.
988 # Failed to construct an API peer. Fall back to legacy.
992 return httppeer(ui, path, respurl, opener, requestbuilder,
989 return httppeer(ui, path, respurl, opener, requestbuilder,
993 info['v1capabilities'])
990 info['v1capabilities'])
994
991
995 def instance(ui, path, create, intents=None, createopts=None):
992 def instance(ui, path, create, intents=None, createopts=None):
996 if create:
993 if create:
997 raise error.Abort(_('cannot create new http repository'))
994 raise error.Abort(_('cannot create new http repository'))
998 try:
995 try:
999 if path.startswith('https:') and not urlmod.has_https:
996 if path.startswith('https:') and not urlmod.has_https:
1000 raise error.Abort(_('Python support for SSL and HTTPS '
997 raise error.Abort(_('Python support for SSL and HTTPS '
1001 'is not installed'))
998 'is not installed'))
1002
999
1003 inst = makepeer(ui, path)
1000 inst = makepeer(ui, path)
1004
1001
1005 return inst
1002 return inst
1006 except error.RepoError as httpexception:
1003 except error.RepoError as httpexception:
1007 try:
1004 try:
1008 r = statichttprepo.instance(ui, "static-" + path, create)
1005 r = statichttprepo.instance(ui, "static-" + path, create)
1009 ui.note(_('(falling back to static-http)\n'))
1006 ui.note(_('(falling back to static-http)\n'))
1010 return r
1007 return r
1011 except error.RepoError:
1008 except error.RepoError:
1012 raise httpexception # use the original http RepoError instead
1009 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now