##// END OF EJS Templates
httppeer: do decompression inside _callstream...
Gregory Szorc -
r30464:e16e234b default
parent child Browse files
Show More
@@ -1,309 +1,311
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import os
12 import os
13 import socket
13 import socket
14 import tempfile
14 import tempfile
15 import zlib
15 import zlib
16
16
17 from .i18n import _
17 from .i18n import _
18 from .node import nullid
18 from .node import nullid
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 error,
21 error,
22 httpconnection,
22 httpconnection,
23 statichttprepo,
23 statichttprepo,
24 url,
24 url,
25 util,
25 util,
26 wireproto,
26 wireproto,
27 )
27 )
28
28
29 httplib = util.httplib
29 httplib = util.httplib
30 urlerr = util.urlerr
30 urlerr = util.urlerr
31 urlreq = util.urlreq
31 urlreq = util.urlreq
32
32
33 def zgenerator(f):
33 def zgenerator(f):
34 zd = zlib.decompressobj()
34 zd = zlib.decompressobj()
35 try:
35 try:
36 for chunk in util.filechunkiter(f):
36 for chunk in util.filechunkiter(f):
37 while chunk:
37 while chunk:
38 yield zd.decompress(chunk, 2**18)
38 yield zd.decompress(chunk, 2**18)
39 chunk = zd.unconsumed_tail
39 chunk = zd.unconsumed_tail
40 except httplib.HTTPException:
40 except httplib.HTTPException:
41 raise IOError(None, _('connection ended unexpectedly'))
41 raise IOError(None, _('connection ended unexpectedly'))
42 yield zd.flush()
42 yield zd.flush()
43
43
44 class httppeer(wireproto.wirepeer):
44 class httppeer(wireproto.wirepeer):
45 def __init__(self, ui, path):
45 def __init__(self, ui, path):
46 self.path = path
46 self.path = path
47 self.caps = None
47 self.caps = None
48 self.handler = None
48 self.handler = None
49 self.urlopener = None
49 self.urlopener = None
50 self.requestbuilder = None
50 self.requestbuilder = None
51 u = util.url(path)
51 u = util.url(path)
52 if u.query or u.fragment:
52 if u.query or u.fragment:
53 raise error.Abort(_('unsupported URL component: "%s"') %
53 raise error.Abort(_('unsupported URL component: "%s"') %
54 (u.query or u.fragment))
54 (u.query or u.fragment))
55
55
56 # urllib cannot handle URLs with embedded user or passwd
56 # urllib cannot handle URLs with embedded user or passwd
57 self._url, authinfo = u.authinfo()
57 self._url, authinfo = u.authinfo()
58
58
59 self.ui = ui
59 self.ui = ui
60 self.ui.debug('using %s\n' % self._url)
60 self.ui.debug('using %s\n' % self._url)
61
61
62 self.urlopener = url.opener(ui, authinfo)
62 self.urlopener = url.opener(ui, authinfo)
63 self.requestbuilder = urlreq.request
63 self.requestbuilder = urlreq.request
64
64
65 def __del__(self):
65 def __del__(self):
66 urlopener = getattr(self, 'urlopener', None)
66 urlopener = getattr(self, 'urlopener', None)
67 if urlopener:
67 if urlopener:
68 for h in urlopener.handlers:
68 for h in urlopener.handlers:
69 h.close()
69 h.close()
70 getattr(h, "close_all", lambda : None)()
70 getattr(h, "close_all", lambda : None)()
71
71
72 def url(self):
72 def url(self):
73 return self.path
73 return self.path
74
74
75 # look up capabilities only when needed
75 # look up capabilities only when needed
76
76
77 def _fetchcaps(self):
77 def _fetchcaps(self):
78 self.caps = set(self._call('capabilities').split())
78 self.caps = set(self._call('capabilities').split())
79
79
80 def _capabilities(self):
80 def _capabilities(self):
81 if self.caps is None:
81 if self.caps is None:
82 try:
82 try:
83 self._fetchcaps()
83 self._fetchcaps()
84 except error.RepoError:
84 except error.RepoError:
85 self.caps = set()
85 self.caps = set()
86 self.ui.debug('capabilities: %s\n' %
86 self.ui.debug('capabilities: %s\n' %
87 (' '.join(self.caps or ['none'])))
87 (' '.join(self.caps or ['none'])))
88 return self.caps
88 return self.caps
89
89
90 def lock(self):
90 def lock(self):
91 raise error.Abort(_('operation not supported over http'))
91 raise error.Abort(_('operation not supported over http'))
92
92
93 def _callstream(self, cmd, **args):
93 def _callstream(self, cmd, _compressible=False, **args):
94 if cmd == 'pushkey':
94 if cmd == 'pushkey':
95 args['data'] = ''
95 args['data'] = ''
96 data = args.pop('data', None)
96 data = args.pop('data', None)
97 headers = args.pop('headers', {})
97 headers = args.pop('headers', {})
98
98
99 self.ui.debug("sending %s command\n" % cmd)
99 self.ui.debug("sending %s command\n" % cmd)
100 q = [('cmd', cmd)]
100 q = [('cmd', cmd)]
101 headersize = 0
101 headersize = 0
102 # Important: don't use self.capable() here or else you end up
102 # Important: don't use self.capable() here or else you end up
103 # with infinite recursion when trying to look up capabilities
103 # with infinite recursion when trying to look up capabilities
104 # for the first time.
104 # for the first time.
105 postargsok = self.caps is not None and 'httppostargs' in self.caps
105 postargsok = self.caps is not None and 'httppostargs' in self.caps
106 # TODO: support for httppostargs when data is a file-like
106 # TODO: support for httppostargs when data is a file-like
107 # object rather than a basestring
107 # object rather than a basestring
108 canmungedata = not data or isinstance(data, basestring)
108 canmungedata = not data or isinstance(data, basestring)
109 if postargsok and canmungedata:
109 if postargsok and canmungedata:
110 strargs = urlreq.urlencode(sorted(args.items()))
110 strargs = urlreq.urlencode(sorted(args.items()))
111 if strargs:
111 if strargs:
112 if not data:
112 if not data:
113 data = strargs
113 data = strargs
114 elif isinstance(data, basestring):
114 elif isinstance(data, basestring):
115 data = strargs + data
115 data = strargs + data
116 headers['X-HgArgs-Post'] = len(strargs)
116 headers['X-HgArgs-Post'] = len(strargs)
117 else:
117 else:
118 if len(args) > 0:
118 if len(args) > 0:
119 httpheader = self.capable('httpheader')
119 httpheader = self.capable('httpheader')
120 if httpheader:
120 if httpheader:
121 headersize = int(httpheader.split(',', 1)[0])
121 headersize = int(httpheader.split(',', 1)[0])
122 if headersize > 0:
122 if headersize > 0:
123 # The headers can typically carry more data than the URL.
123 # The headers can typically carry more data than the URL.
124 encargs = urlreq.urlencode(sorted(args.items()))
124 encargs = urlreq.urlencode(sorted(args.items()))
125 headerfmt = 'X-HgArg-%s'
125 headerfmt = 'X-HgArg-%s'
126 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
126 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
127 headernum = 0
127 headernum = 0
128 varyheaders = []
128 varyheaders = []
129 for i in xrange(0, len(encargs), contentlen):
129 for i in xrange(0, len(encargs), contentlen):
130 headernum += 1
130 headernum += 1
131 header = headerfmt % str(headernum)
131 header = headerfmt % str(headernum)
132 headers[header] = encargs[i:i + contentlen]
132 headers[header] = encargs[i:i + contentlen]
133 varyheaders.append(header)
133 varyheaders.append(header)
134 headers['Vary'] = ','.join(varyheaders)
134 headers['Vary'] = ','.join(varyheaders)
135 else:
135 else:
136 q += sorted(args.items())
136 q += sorted(args.items())
137 qs = '?%s' % urlreq.urlencode(q)
137 qs = '?%s' % urlreq.urlencode(q)
138 cu = "%s%s" % (self._url, qs)
138 cu = "%s%s" % (self._url, qs)
139 size = 0
139 size = 0
140 if util.safehasattr(data, 'length'):
140 if util.safehasattr(data, 'length'):
141 size = data.length
141 size = data.length
142 elif data is not None:
142 elif data is not None:
143 size = len(data)
143 size = len(data)
144 if size and self.ui.configbool('ui', 'usehttp2', False):
144 if size and self.ui.configbool('ui', 'usehttp2', False):
145 headers['Expect'] = '100-Continue'
145 headers['Expect'] = '100-Continue'
146 headers['X-HgHttp2'] = '1'
146 headers['X-HgHttp2'] = '1'
147 if data is not None and 'Content-Type' not in headers:
147 if data is not None and 'Content-Type' not in headers:
148 headers['Content-Type'] = 'application/mercurial-0.1'
148 headers['Content-Type'] = 'application/mercurial-0.1'
149 req = self.requestbuilder(cu, data, headers)
149 req = self.requestbuilder(cu, data, headers)
150 if data is not None:
150 if data is not None:
151 self.ui.debug("sending %s bytes\n" % size)
151 self.ui.debug("sending %s bytes\n" % size)
152 req.add_unredirected_header('Content-Length', '%d' % size)
152 req.add_unredirected_header('Content-Length', '%d' % size)
153 try:
153 try:
154 resp = self.urlopener.open(req)
154 resp = self.urlopener.open(req)
155 except urlerr.httperror as inst:
155 except urlerr.httperror as inst:
156 if inst.code == 401:
156 if inst.code == 401:
157 raise error.Abort(_('authorization failed'))
157 raise error.Abort(_('authorization failed'))
158 raise
158 raise
159 except httplib.HTTPException as inst:
159 except httplib.HTTPException as inst:
160 self.ui.debug('http error while sending %s command\n' % cmd)
160 self.ui.debug('http error while sending %s command\n' % cmd)
161 self.ui.traceback()
161 self.ui.traceback()
162 raise IOError(None, inst)
162 raise IOError(None, inst)
163 except IndexError:
163 except IndexError:
164 # this only happens with Python 2.3, later versions raise URLError
164 # this only happens with Python 2.3, later versions raise URLError
165 raise error.Abort(_('http error, possibly caused by proxy setting'))
165 raise error.Abort(_('http error, possibly caused by proxy setting'))
166 # record the url we got redirected to
166 # record the url we got redirected to
167 resp_url = resp.geturl()
167 resp_url = resp.geturl()
168 if resp_url.endswith(qs):
168 if resp_url.endswith(qs):
169 resp_url = resp_url[:-len(qs)]
169 resp_url = resp_url[:-len(qs)]
170 if self._url.rstrip('/') != resp_url.rstrip('/'):
170 if self._url.rstrip('/') != resp_url.rstrip('/'):
171 if not self.ui.quiet:
171 if not self.ui.quiet:
172 self.ui.warn(_('real URL is %s\n') % resp_url)
172 self.ui.warn(_('real URL is %s\n') % resp_url)
173 self._url = resp_url
173 self._url = resp_url
174 try:
174 try:
175 proto = resp.getheader('content-type')
175 proto = resp.getheader('content-type')
176 except AttributeError:
176 except AttributeError:
177 proto = resp.headers.get('content-type', '')
177 proto = resp.headers.get('content-type', '')
178
178
179 safeurl = util.hidepassword(self._url)
179 safeurl = util.hidepassword(self._url)
180 if proto.startswith('application/hg-error'):
180 if proto.startswith('application/hg-error'):
181 raise error.OutOfBandError(resp.read())
181 raise error.OutOfBandError(resp.read())
182 # accept old "text/plain" and "application/hg-changegroup" for now
182 # accept old "text/plain" and "application/hg-changegroup" for now
183 if not (proto.startswith('application/mercurial-') or
183 if not (proto.startswith('application/mercurial-') or
184 (proto.startswith('text/plain')
184 (proto.startswith('text/plain')
185 and not resp.headers.get('content-length')) or
185 and not resp.headers.get('content-length')) or
186 proto.startswith('application/hg-changegroup')):
186 proto.startswith('application/hg-changegroup')):
187 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
187 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
188 raise error.RepoError(
188 raise error.RepoError(
189 _("'%s' does not appear to be an hg repository:\n"
189 _("'%s' does not appear to be an hg repository:\n"
190 "---%%<--- (%s)\n%s\n---%%<---\n")
190 "---%%<--- (%s)\n%s\n---%%<---\n")
191 % (safeurl, proto or 'no content-type', resp.read(1024)))
191 % (safeurl, proto or 'no content-type', resp.read(1024)))
192
192
193 if proto.startswith('application/mercurial-'):
193 if proto.startswith('application/mercurial-'):
194 try:
194 try:
195 version = proto.split('-', 1)[1]
195 version = proto.split('-', 1)[1]
196 version_info = tuple([int(n) for n in version.split('.')])
196 version_info = tuple([int(n) for n in version.split('.')])
197 except ValueError:
197 except ValueError:
198 raise error.RepoError(_("'%s' sent a broken Content-Type "
198 raise error.RepoError(_("'%s' sent a broken Content-Type "
199 "header (%s)") % (safeurl, proto))
199 "header (%s)") % (safeurl, proto))
200 if version_info > (0, 1):
200 if version_info > (0, 1):
201 raise error.RepoError(_("'%s' uses newer protocol %s") %
201 raise error.RepoError(_("'%s' uses newer protocol %s") %
202 (safeurl, version))
202 (safeurl, version))
203
203
204 if _compressible:
205 return util.chunkbuffer(zgenerator(resp))
206
204 return resp
207 return resp
205
208
206 def _call(self, cmd, **args):
209 def _call(self, cmd, **args):
207 fp = self._callstream(cmd, **args)
210 fp = self._callstream(cmd, **args)
208 try:
211 try:
209 return fp.read()
212 return fp.read()
210 finally:
213 finally:
211 # if using keepalive, allow connection to be reused
214 # if using keepalive, allow connection to be reused
212 fp.close()
215 fp.close()
213
216
214 def _callpush(self, cmd, cg, **args):
217 def _callpush(self, cmd, cg, **args):
215 # have to stream bundle to a temp file because we do not have
218 # have to stream bundle to a temp file because we do not have
216 # http 1.1 chunked transfer.
219 # http 1.1 chunked transfer.
217
220
218 types = self.capable('unbundle')
221 types = self.capable('unbundle')
219 try:
222 try:
220 types = types.split(',')
223 types = types.split(',')
221 except AttributeError:
224 except AttributeError:
222 # servers older than d1b16a746db6 will send 'unbundle' as a
225 # servers older than d1b16a746db6 will send 'unbundle' as a
223 # boolean capability. They only support headerless/uncompressed
226 # boolean capability. They only support headerless/uncompressed
224 # bundles.
227 # bundles.
225 types = [""]
228 types = [""]
226 for x in types:
229 for x in types:
227 if x in bundle2.bundletypes:
230 if x in bundle2.bundletypes:
228 type = x
231 type = x
229 break
232 break
230
233
231 tempname = bundle2.writebundle(self.ui, cg, None, type)
234 tempname = bundle2.writebundle(self.ui, cg, None, type)
232 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
235 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
233 headers = {'Content-Type': 'application/mercurial-0.1'}
236 headers = {'Content-Type': 'application/mercurial-0.1'}
234
237
235 try:
238 try:
236 r = self._call(cmd, data=fp, headers=headers, **args)
239 r = self._call(cmd, data=fp, headers=headers, **args)
237 vals = r.split('\n', 1)
240 vals = r.split('\n', 1)
238 if len(vals) < 2:
241 if len(vals) < 2:
239 raise error.ResponseError(_("unexpected response:"), r)
242 raise error.ResponseError(_("unexpected response:"), r)
240 return vals
243 return vals
241 except socket.error as err:
244 except socket.error as err:
242 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
245 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
243 raise error.Abort(_('push failed: %s') % err.args[1])
246 raise error.Abort(_('push failed: %s') % err.args[1])
244 raise error.Abort(err.args[1])
247 raise error.Abort(err.args[1])
245 finally:
248 finally:
246 fp.close()
249 fp.close()
247 os.unlink(tempname)
250 os.unlink(tempname)
248
251
249 def _calltwowaystream(self, cmd, fp, **args):
252 def _calltwowaystream(self, cmd, fp, **args):
250 fh = None
253 fh = None
251 fp_ = None
254 fp_ = None
252 filename = None
255 filename = None
253 try:
256 try:
254 # dump bundle to disk
257 # dump bundle to disk
255 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
258 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
256 fh = os.fdopen(fd, "wb")
259 fh = os.fdopen(fd, "wb")
257 d = fp.read(4096)
260 d = fp.read(4096)
258 while d:
261 while d:
259 fh.write(d)
262 fh.write(d)
260 d = fp.read(4096)
263 d = fp.read(4096)
261 fh.close()
264 fh.close()
262 # start http push
265 # start http push
263 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
266 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
264 headers = {'Content-Type': 'application/mercurial-0.1'}
267 headers = {'Content-Type': 'application/mercurial-0.1'}
265 return self._callstream(cmd, data=fp_, headers=headers, **args)
268 return self._callstream(cmd, data=fp_, headers=headers, **args)
266 finally:
269 finally:
267 if fp_ is not None:
270 if fp_ is not None:
268 fp_.close()
271 fp_.close()
269 if fh is not None:
272 if fh is not None:
270 fh.close()
273 fh.close()
271 os.unlink(filename)
274 os.unlink(filename)
272
275
273 def _callcompressable(self, cmd, **args):
276 def _callcompressable(self, cmd, **args):
274 stream = self._callstream(cmd, **args)
277 return self._callstream(cmd, _compressible=True, **args)
275 return util.chunkbuffer(zgenerator(stream))
276
278
277 def _abort(self, exception):
279 def _abort(self, exception):
278 raise exception
280 raise exception
279
281
280 class httpspeer(httppeer):
282 class httpspeer(httppeer):
281 def __init__(self, ui, path):
283 def __init__(self, ui, path):
282 if not url.has_https:
284 if not url.has_https:
283 raise error.Abort(_('Python support for SSL and HTTPS '
285 raise error.Abort(_('Python support for SSL and HTTPS '
284 'is not installed'))
286 'is not installed'))
285 httppeer.__init__(self, ui, path)
287 httppeer.__init__(self, ui, path)
286
288
287 def instance(ui, path, create):
289 def instance(ui, path, create):
288 if create:
290 if create:
289 raise error.Abort(_('cannot create new http repository'))
291 raise error.Abort(_('cannot create new http repository'))
290 try:
292 try:
291 if path.startswith('https:'):
293 if path.startswith('https:'):
292 inst = httpspeer(ui, path)
294 inst = httpspeer(ui, path)
293 else:
295 else:
294 inst = httppeer(ui, path)
296 inst = httppeer(ui, path)
295 try:
297 try:
296 # Try to do useful work when checking compatibility.
298 # Try to do useful work when checking compatibility.
297 # Usually saves a roundtrip since we want the caps anyway.
299 # Usually saves a roundtrip since we want the caps anyway.
298 inst._fetchcaps()
300 inst._fetchcaps()
299 except error.RepoError:
301 except error.RepoError:
300 # No luck, try older compatibility check.
302 # No luck, try older compatibility check.
301 inst.between([(nullid, nullid)])
303 inst.between([(nullid, nullid)])
302 return inst
304 return inst
303 except error.RepoError as httpexception:
305 except error.RepoError as httpexception:
304 try:
306 try:
305 r = statichttprepo.instance(ui, "static-" + path, create)
307 r = statichttprepo.instance(ui, "static-" + path, create)
306 ui.note(_('(falling back to static-http)\n'))
308 ui.note(_('(falling back to static-http)\n'))
307 return r
309 return r
308 except error.RepoError:
310 except error.RepoError:
309 raise httpexception # use the original http RepoError instead
311 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now