##// END OF EJS Templates
httppeer: use compression engine API for decompressing responses...
Gregory Szorc -
r30465:40a1871e default
parent child Browse files
Show More
@@ -1,311 +1,320
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import os
12 import os
13 import socket
13 import socket
14 import tempfile
14 import tempfile
15 import zlib
16
15
17 from .i18n import _
16 from .i18n import _
18 from .node import nullid
17 from .node import nullid
19 from . import (
18 from . import (
20 bundle2,
19 bundle2,
21 error,
20 error,
22 httpconnection,
21 httpconnection,
23 statichttprepo,
22 statichttprepo,
24 url,
23 url,
25 util,
24 util,
26 wireproto,
25 wireproto,
27 )
26 )
28
27
29 httplib = util.httplib
28 httplib = util.httplib
30 urlerr = util.urlerr
29 urlerr = util.urlerr
31 urlreq = util.urlreq
30 urlreq = util.urlreq
32
31
33 def zgenerator(f):
32 # FUTURE: consider refactoring this API to use generators. This will
34 zd = zlib.decompressobj()
33 # require a compression engine API to emit generators.
34 def decompressresponse(response, engine):
35 try:
35 try:
36 for chunk in util.filechunkiter(f):
36 reader = engine.decompressorreader(response)
37 while chunk:
38 yield zd.decompress(chunk, 2**18)
39 chunk = zd.unconsumed_tail
40 except httplib.HTTPException:
37 except httplib.HTTPException:
41 raise IOError(None, _('connection ended unexpectedly'))
38 raise IOError(None, _('connection ended unexpectedly'))
42 yield zd.flush()
39
40 # We need to wrap reader.read() so HTTPException on subsequent
41 # reads is also converted.
42 origread = reader.read
43 class readerproxy(reader.__class__):
44 def read(self, *args, **kwargs):
45 try:
46 return origread(*args, **kwargs)
47 except httplib.HTTPException:
48 raise IOError(None, _('connection ended unexpectedly'))
49
50 reader.__class__ = readerproxy
51 return reader
43
52
44 class httppeer(wireproto.wirepeer):
53 class httppeer(wireproto.wirepeer):
45 def __init__(self, ui, path):
54 def __init__(self, ui, path):
46 self.path = path
55 self.path = path
47 self.caps = None
56 self.caps = None
48 self.handler = None
57 self.handler = None
49 self.urlopener = None
58 self.urlopener = None
50 self.requestbuilder = None
59 self.requestbuilder = None
51 u = util.url(path)
60 u = util.url(path)
52 if u.query or u.fragment:
61 if u.query or u.fragment:
53 raise error.Abort(_('unsupported URL component: "%s"') %
62 raise error.Abort(_('unsupported URL component: "%s"') %
54 (u.query or u.fragment))
63 (u.query or u.fragment))
55
64
56 # urllib cannot handle URLs with embedded user or passwd
65 # urllib cannot handle URLs with embedded user or passwd
57 self._url, authinfo = u.authinfo()
66 self._url, authinfo = u.authinfo()
58
67
59 self.ui = ui
68 self.ui = ui
60 self.ui.debug('using %s\n' % self._url)
69 self.ui.debug('using %s\n' % self._url)
61
70
62 self.urlopener = url.opener(ui, authinfo)
71 self.urlopener = url.opener(ui, authinfo)
63 self.requestbuilder = urlreq.request
72 self.requestbuilder = urlreq.request
64
73
65 def __del__(self):
74 def __del__(self):
66 urlopener = getattr(self, 'urlopener', None)
75 urlopener = getattr(self, 'urlopener', None)
67 if urlopener:
76 if urlopener:
68 for h in urlopener.handlers:
77 for h in urlopener.handlers:
69 h.close()
78 h.close()
70 getattr(h, "close_all", lambda : None)()
79 getattr(h, "close_all", lambda : None)()
71
80
72 def url(self):
81 def url(self):
73 return self.path
82 return self.path
74
83
75 # look up capabilities only when needed
84 # look up capabilities only when needed
76
85
77 def _fetchcaps(self):
86 def _fetchcaps(self):
78 self.caps = set(self._call('capabilities').split())
87 self.caps = set(self._call('capabilities').split())
79
88
80 def _capabilities(self):
89 def _capabilities(self):
81 if self.caps is None:
90 if self.caps is None:
82 try:
91 try:
83 self._fetchcaps()
92 self._fetchcaps()
84 except error.RepoError:
93 except error.RepoError:
85 self.caps = set()
94 self.caps = set()
86 self.ui.debug('capabilities: %s\n' %
95 self.ui.debug('capabilities: %s\n' %
87 (' '.join(self.caps or ['none'])))
96 (' '.join(self.caps or ['none'])))
88 return self.caps
97 return self.caps
89
98
90 def lock(self):
99 def lock(self):
91 raise error.Abort(_('operation not supported over http'))
100 raise error.Abort(_('operation not supported over http'))
92
101
93 def _callstream(self, cmd, _compressible=False, **args):
102 def _callstream(self, cmd, _compressible=False, **args):
94 if cmd == 'pushkey':
103 if cmd == 'pushkey':
95 args['data'] = ''
104 args['data'] = ''
96 data = args.pop('data', None)
105 data = args.pop('data', None)
97 headers = args.pop('headers', {})
106 headers = args.pop('headers', {})
98
107
99 self.ui.debug("sending %s command\n" % cmd)
108 self.ui.debug("sending %s command\n" % cmd)
100 q = [('cmd', cmd)]
109 q = [('cmd', cmd)]
101 headersize = 0
110 headersize = 0
102 # Important: don't use self.capable() here or else you end up
111 # Important: don't use self.capable() here or else you end up
103 # with infinite recursion when trying to look up capabilities
112 # with infinite recursion when trying to look up capabilities
104 # for the first time.
113 # for the first time.
105 postargsok = self.caps is not None and 'httppostargs' in self.caps
114 postargsok = self.caps is not None and 'httppostargs' in self.caps
106 # TODO: support for httppostargs when data is a file-like
115 # TODO: support for httppostargs when data is a file-like
107 # object rather than a basestring
116 # object rather than a basestring
108 canmungedata = not data or isinstance(data, basestring)
117 canmungedata = not data or isinstance(data, basestring)
109 if postargsok and canmungedata:
118 if postargsok and canmungedata:
110 strargs = urlreq.urlencode(sorted(args.items()))
119 strargs = urlreq.urlencode(sorted(args.items()))
111 if strargs:
120 if strargs:
112 if not data:
121 if not data:
113 data = strargs
122 data = strargs
114 elif isinstance(data, basestring):
123 elif isinstance(data, basestring):
115 data = strargs + data
124 data = strargs + data
116 headers['X-HgArgs-Post'] = len(strargs)
125 headers['X-HgArgs-Post'] = len(strargs)
117 else:
126 else:
118 if len(args) > 0:
127 if len(args) > 0:
119 httpheader = self.capable('httpheader')
128 httpheader = self.capable('httpheader')
120 if httpheader:
129 if httpheader:
121 headersize = int(httpheader.split(',', 1)[0])
130 headersize = int(httpheader.split(',', 1)[0])
122 if headersize > 0:
131 if headersize > 0:
123 # The headers can typically carry more data than the URL.
132 # The headers can typically carry more data than the URL.
124 encargs = urlreq.urlencode(sorted(args.items()))
133 encargs = urlreq.urlencode(sorted(args.items()))
125 headerfmt = 'X-HgArg-%s'
134 headerfmt = 'X-HgArg-%s'
126 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
135 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
127 headernum = 0
136 headernum = 0
128 varyheaders = []
137 varyheaders = []
129 for i in xrange(0, len(encargs), contentlen):
138 for i in xrange(0, len(encargs), contentlen):
130 headernum += 1
139 headernum += 1
131 header = headerfmt % str(headernum)
140 header = headerfmt % str(headernum)
132 headers[header] = encargs[i:i + contentlen]
141 headers[header] = encargs[i:i + contentlen]
133 varyheaders.append(header)
142 varyheaders.append(header)
134 headers['Vary'] = ','.join(varyheaders)
143 headers['Vary'] = ','.join(varyheaders)
135 else:
144 else:
136 q += sorted(args.items())
145 q += sorted(args.items())
137 qs = '?%s' % urlreq.urlencode(q)
146 qs = '?%s' % urlreq.urlencode(q)
138 cu = "%s%s" % (self._url, qs)
147 cu = "%s%s" % (self._url, qs)
139 size = 0
148 size = 0
140 if util.safehasattr(data, 'length'):
149 if util.safehasattr(data, 'length'):
141 size = data.length
150 size = data.length
142 elif data is not None:
151 elif data is not None:
143 size = len(data)
152 size = len(data)
144 if size and self.ui.configbool('ui', 'usehttp2', False):
153 if size and self.ui.configbool('ui', 'usehttp2', False):
145 headers['Expect'] = '100-Continue'
154 headers['Expect'] = '100-Continue'
146 headers['X-HgHttp2'] = '1'
155 headers['X-HgHttp2'] = '1'
147 if data is not None and 'Content-Type' not in headers:
156 if data is not None and 'Content-Type' not in headers:
148 headers['Content-Type'] = 'application/mercurial-0.1'
157 headers['Content-Type'] = 'application/mercurial-0.1'
149 req = self.requestbuilder(cu, data, headers)
158 req = self.requestbuilder(cu, data, headers)
150 if data is not None:
159 if data is not None:
151 self.ui.debug("sending %s bytes\n" % size)
160 self.ui.debug("sending %s bytes\n" % size)
152 req.add_unredirected_header('Content-Length', '%d' % size)
161 req.add_unredirected_header('Content-Length', '%d' % size)
153 try:
162 try:
154 resp = self.urlopener.open(req)
163 resp = self.urlopener.open(req)
155 except urlerr.httperror as inst:
164 except urlerr.httperror as inst:
156 if inst.code == 401:
165 if inst.code == 401:
157 raise error.Abort(_('authorization failed'))
166 raise error.Abort(_('authorization failed'))
158 raise
167 raise
159 except httplib.HTTPException as inst:
168 except httplib.HTTPException as inst:
160 self.ui.debug('http error while sending %s command\n' % cmd)
169 self.ui.debug('http error while sending %s command\n' % cmd)
161 self.ui.traceback()
170 self.ui.traceback()
162 raise IOError(None, inst)
171 raise IOError(None, inst)
163 except IndexError:
172 except IndexError:
164 # this only happens with Python 2.3, later versions raise URLError
173 # this only happens with Python 2.3, later versions raise URLError
165 raise error.Abort(_('http error, possibly caused by proxy setting'))
174 raise error.Abort(_('http error, possibly caused by proxy setting'))
166 # record the url we got redirected to
175 # record the url we got redirected to
167 resp_url = resp.geturl()
176 resp_url = resp.geturl()
168 if resp_url.endswith(qs):
177 if resp_url.endswith(qs):
169 resp_url = resp_url[:-len(qs)]
178 resp_url = resp_url[:-len(qs)]
170 if self._url.rstrip('/') != resp_url.rstrip('/'):
179 if self._url.rstrip('/') != resp_url.rstrip('/'):
171 if not self.ui.quiet:
180 if not self.ui.quiet:
172 self.ui.warn(_('real URL is %s\n') % resp_url)
181 self.ui.warn(_('real URL is %s\n') % resp_url)
173 self._url = resp_url
182 self._url = resp_url
174 try:
183 try:
175 proto = resp.getheader('content-type')
184 proto = resp.getheader('content-type')
176 except AttributeError:
185 except AttributeError:
177 proto = resp.headers.get('content-type', '')
186 proto = resp.headers.get('content-type', '')
178
187
179 safeurl = util.hidepassword(self._url)
188 safeurl = util.hidepassword(self._url)
180 if proto.startswith('application/hg-error'):
189 if proto.startswith('application/hg-error'):
181 raise error.OutOfBandError(resp.read())
190 raise error.OutOfBandError(resp.read())
182 # accept old "text/plain" and "application/hg-changegroup" for now
191 # accept old "text/plain" and "application/hg-changegroup" for now
183 if not (proto.startswith('application/mercurial-') or
192 if not (proto.startswith('application/mercurial-') or
184 (proto.startswith('text/plain')
193 (proto.startswith('text/plain')
185 and not resp.headers.get('content-length')) or
194 and not resp.headers.get('content-length')) or
186 proto.startswith('application/hg-changegroup')):
195 proto.startswith('application/hg-changegroup')):
187 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
196 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
188 raise error.RepoError(
197 raise error.RepoError(
189 _("'%s' does not appear to be an hg repository:\n"
198 _("'%s' does not appear to be an hg repository:\n"
190 "---%%<--- (%s)\n%s\n---%%<---\n")
199 "---%%<--- (%s)\n%s\n---%%<---\n")
191 % (safeurl, proto or 'no content-type', resp.read(1024)))
200 % (safeurl, proto or 'no content-type', resp.read(1024)))
192
201
193 if proto.startswith('application/mercurial-'):
202 if proto.startswith('application/mercurial-'):
194 try:
203 try:
195 version = proto.split('-', 1)[1]
204 version = proto.split('-', 1)[1]
196 version_info = tuple([int(n) for n in version.split('.')])
205 version_info = tuple([int(n) for n in version.split('.')])
197 except ValueError:
206 except ValueError:
198 raise error.RepoError(_("'%s' sent a broken Content-Type "
207 raise error.RepoError(_("'%s' sent a broken Content-Type "
199 "header (%s)") % (safeurl, proto))
208 "header (%s)") % (safeurl, proto))
200 if version_info > (0, 1):
209 if version_info > (0, 1):
201 raise error.RepoError(_("'%s' uses newer protocol %s") %
210 raise error.RepoError(_("'%s' uses newer protocol %s") %
202 (safeurl, version))
211 (safeurl, version))
203
212
204 if _compressible:
213 if _compressible:
205 return util.chunkbuffer(zgenerator(resp))
214 return decompressresponse(resp, util.compengines['zlib'])
206
215
207 return resp
216 return resp
208
217
209 def _call(self, cmd, **args):
218 def _call(self, cmd, **args):
210 fp = self._callstream(cmd, **args)
219 fp = self._callstream(cmd, **args)
211 try:
220 try:
212 return fp.read()
221 return fp.read()
213 finally:
222 finally:
214 # if using keepalive, allow connection to be reused
223 # if using keepalive, allow connection to be reused
215 fp.close()
224 fp.close()
216
225
217 def _callpush(self, cmd, cg, **args):
226 def _callpush(self, cmd, cg, **args):
218 # have to stream bundle to a temp file because we do not have
227 # have to stream bundle to a temp file because we do not have
219 # http 1.1 chunked transfer.
228 # http 1.1 chunked transfer.
220
229
221 types = self.capable('unbundle')
230 types = self.capable('unbundle')
222 try:
231 try:
223 types = types.split(',')
232 types = types.split(',')
224 except AttributeError:
233 except AttributeError:
225 # servers older than d1b16a746db6 will send 'unbundle' as a
234 # servers older than d1b16a746db6 will send 'unbundle' as a
226 # boolean capability. They only support headerless/uncompressed
235 # boolean capability. They only support headerless/uncompressed
227 # bundles.
236 # bundles.
228 types = [""]
237 types = [""]
229 for x in types:
238 for x in types:
230 if x in bundle2.bundletypes:
239 if x in bundle2.bundletypes:
231 type = x
240 type = x
232 break
241 break
233
242
234 tempname = bundle2.writebundle(self.ui, cg, None, type)
243 tempname = bundle2.writebundle(self.ui, cg, None, type)
235 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
244 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
236 headers = {'Content-Type': 'application/mercurial-0.1'}
245 headers = {'Content-Type': 'application/mercurial-0.1'}
237
246
238 try:
247 try:
239 r = self._call(cmd, data=fp, headers=headers, **args)
248 r = self._call(cmd, data=fp, headers=headers, **args)
240 vals = r.split('\n', 1)
249 vals = r.split('\n', 1)
241 if len(vals) < 2:
250 if len(vals) < 2:
242 raise error.ResponseError(_("unexpected response:"), r)
251 raise error.ResponseError(_("unexpected response:"), r)
243 return vals
252 return vals
244 except socket.error as err:
253 except socket.error as err:
245 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
254 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
246 raise error.Abort(_('push failed: %s') % err.args[1])
255 raise error.Abort(_('push failed: %s') % err.args[1])
247 raise error.Abort(err.args[1])
256 raise error.Abort(err.args[1])
248 finally:
257 finally:
249 fp.close()
258 fp.close()
250 os.unlink(tempname)
259 os.unlink(tempname)
251
260
252 def _calltwowaystream(self, cmd, fp, **args):
261 def _calltwowaystream(self, cmd, fp, **args):
253 fh = None
262 fh = None
254 fp_ = None
263 fp_ = None
255 filename = None
264 filename = None
256 try:
265 try:
257 # dump bundle to disk
266 # dump bundle to disk
258 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
267 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
259 fh = os.fdopen(fd, "wb")
268 fh = os.fdopen(fd, "wb")
260 d = fp.read(4096)
269 d = fp.read(4096)
261 while d:
270 while d:
262 fh.write(d)
271 fh.write(d)
263 d = fp.read(4096)
272 d = fp.read(4096)
264 fh.close()
273 fh.close()
265 # start http push
274 # start http push
266 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
275 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
267 headers = {'Content-Type': 'application/mercurial-0.1'}
276 headers = {'Content-Type': 'application/mercurial-0.1'}
268 return self._callstream(cmd, data=fp_, headers=headers, **args)
277 return self._callstream(cmd, data=fp_, headers=headers, **args)
269 finally:
278 finally:
270 if fp_ is not None:
279 if fp_ is not None:
271 fp_.close()
280 fp_.close()
272 if fh is not None:
281 if fh is not None:
273 fh.close()
282 fh.close()
274 os.unlink(filename)
283 os.unlink(filename)
275
284
276 def _callcompressable(self, cmd, **args):
285 def _callcompressable(self, cmd, **args):
277 return self._callstream(cmd, _compressible=True, **args)
286 return self._callstream(cmd, _compressible=True, **args)
278
287
279 def _abort(self, exception):
288 def _abort(self, exception):
280 raise exception
289 raise exception
281
290
282 class httpspeer(httppeer):
291 class httpspeer(httppeer):
283 def __init__(self, ui, path):
292 def __init__(self, ui, path):
284 if not url.has_https:
293 if not url.has_https:
285 raise error.Abort(_('Python support for SSL and HTTPS '
294 raise error.Abort(_('Python support for SSL and HTTPS '
286 'is not installed'))
295 'is not installed'))
287 httppeer.__init__(self, ui, path)
296 httppeer.__init__(self, ui, path)
288
297
289 def instance(ui, path, create):
298 def instance(ui, path, create):
290 if create:
299 if create:
291 raise error.Abort(_('cannot create new http repository'))
300 raise error.Abort(_('cannot create new http repository'))
292 try:
301 try:
293 if path.startswith('https:'):
302 if path.startswith('https:'):
294 inst = httpspeer(ui, path)
303 inst = httpspeer(ui, path)
295 else:
304 else:
296 inst = httppeer(ui, path)
305 inst = httppeer(ui, path)
297 try:
306 try:
298 # Try to do useful work when checking compatibility.
307 # Try to do useful work when checking compatibility.
299 # Usually saves a roundtrip since we want the caps anyway.
308 # Usually saves a roundtrip since we want the caps anyway.
300 inst._fetchcaps()
309 inst._fetchcaps()
301 except error.RepoError:
310 except error.RepoError:
302 # No luck, try older compatibility check.
311 # No luck, try older compatibility check.
303 inst.between([(nullid, nullid)])
312 inst.between([(nullid, nullid)])
304 return inst
313 return inst
305 except error.RepoError as httpexception:
314 except error.RepoError as httpexception:
306 try:
315 try:
307 r = statichttprepo.instance(ui, "static-" + path, create)
316 r = statichttprepo.instance(ui, "static-" + path, create)
308 ui.note(_('(falling back to static-http)\n'))
317 ui.note(_('(falling back to static-http)\n'))
309 return r
318 return r
310 except error.RepoError:
319 except error.RepoError:
311 raise httpexception # use the original http RepoError instead
320 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now