##// END OF EJS Templates
httppeer: eliminate decompressresponse() proxy...
Gregory Szorc -
r32003:84569d2b default
parent child Browse files
Show More
@@ -1,422 +1,400 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import os
13 13 import socket
14 14 import struct
15 15 import tempfile
16 16
17 17 from .i18n import _
18 18 from .node import nullid
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 statichttprepo,
25 25 url,
26 26 util,
27 27 wireproto,
28 28 )
29 29
30 30 httplib = util.httplib
31 31 urlerr = util.urlerr
32 32 urlreq = util.urlreq
33 33
34 # FUTURE: consider refactoring this API to use generators. This will
35 # require a compression engine API to emit generators.
36 def decompressresponse(response, engine):
37 try:
38 reader = engine.decompressorreader(response)
39 except httplib.HTTPException:
40 raise IOError(None, _('connection ended unexpectedly'))
41
42 # We need to wrap reader.read() so HTTPException on subsequent
43 # reads is also converted.
44 # Ideally we'd use super() here. However, if ``reader`` isn't a new-style
45 # class, this can raise:
46 # TypeError: super() argument 1 must be type, not classobj
47 origread = reader.read
48 class readerproxy(reader.__class__):
49 def read(self, *args, **kwargs):
50 try:
51 return origread(*args, **kwargs)
52 except httplib.HTTPException:
53 raise IOError(None, _('connection ended unexpectedly'))
54
55 reader.__class__ = readerproxy
56 return reader
57
58 34 def encodevalueinheaders(value, header, limit):
59 35 """Encode a string value into multiple HTTP headers.
60 36
61 37 ``value`` will be encoded into 1 or more HTTP headers with the names
62 38 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
63 39 name + value will be at most ``limit`` bytes long.
64 40
65 41 Returns an iterable of 2-tuples consisting of header names and values.
66 42 """
67 43 fmt = header + '-%s'
68 44 valuelen = limit - len(fmt % '000') - len(': \r\n')
69 45 result = []
70 46
71 47 n = 0
72 48 for i in xrange(0, len(value), valuelen):
73 49 n += 1
74 50 result.append((fmt % str(n), value[i:i + valuelen]))
75 51
76 52 return result
77 53
78 54 def _wraphttpresponse(resp):
79 55 """Wrap an HTTPResponse with common error handlers.
80 56
81 57 This ensures that any I/O from any consumer raises the appropriate
82 58 error and messaging.
83 59 """
84 60 origread = resp.read
85 61
86 62 class readerproxy(resp.__class__):
87 63 def read(self, size=None):
88 64 try:
89 65 return origread(size)
90 66 except httplib.IncompleteRead as e:
91 67 # e.expected is an integer if length known or None otherwise.
92 68 if e.expected:
93 69 msg = _('HTTP request error (incomplete response; '
94 70 'expected %d bytes got %d)') % (e.expected,
95 71 len(e.partial))
96 72 else:
97 73 msg = _('HTTP request error (incomplete response)')
98 74
99 75 raise error.RichIOError(
100 76 msg,
101 77 hint=_('this may be an intermittent network failure; '
102 78 'if the error persists, consider contacting the '
103 79 'network or server operator'))
104 80 except httplib.HTTPException as e:
105 81 raise error.RichIOError(
106 82 _('HTTP request error (%s)') % e,
107 83 hint=_('this may be an intermittent failure; '
108 84 'if the error persists, consider contacting the '
109 85 'network or server operator'))
110 86
111 87 resp.__class__ = readerproxy
112 88
113 89 class httppeer(wireproto.wirepeer):
114 90 def __init__(self, ui, path):
115 91 self.path = path
116 92 self.caps = None
117 93 self.handler = None
118 94 self.urlopener = None
119 95 self.requestbuilder = None
120 96 u = util.url(path)
121 97 if u.query or u.fragment:
122 98 raise error.Abort(_('unsupported URL component: "%s"') %
123 99 (u.query or u.fragment))
124 100
125 101 # urllib cannot handle URLs with embedded user or passwd
126 102 self._url, authinfo = u.authinfo()
127 103
128 104 self.ui = ui
129 105 self.ui.debug('using %s\n' % self._url)
130 106
131 107 self.urlopener = url.opener(ui, authinfo)
132 108 self.requestbuilder = urlreq.request
133 109
134 110 def __del__(self):
135 111 urlopener = getattr(self, 'urlopener', None)
136 112 if urlopener:
137 113 for h in urlopener.handlers:
138 114 h.close()
139 115 getattr(h, "close_all", lambda : None)()
140 116
141 117 def url(self):
142 118 return self.path
143 119
144 120 # look up capabilities only when needed
145 121
146 122 def _fetchcaps(self):
147 123 self.caps = set(self._call('capabilities').split())
148 124
149 125 def _capabilities(self):
150 126 if self.caps is None:
151 127 try:
152 128 self._fetchcaps()
153 129 except error.RepoError:
154 130 self.caps = set()
155 131 self.ui.debug('capabilities: %s\n' %
156 132 (' '.join(self.caps or ['none'])))
157 133 return self.caps
158 134
159 135 def lock(self):
160 136 raise error.Abort(_('operation not supported over http'))
161 137
162 138 def _callstream(self, cmd, _compressible=False, **args):
163 139 if cmd == 'pushkey':
164 140 args['data'] = ''
165 141 data = args.pop('data', None)
166 142 headers = args.pop('headers', {})
167 143
168 144 self.ui.debug("sending %s command\n" % cmd)
169 145 q = [('cmd', cmd)]
170 146 headersize = 0
171 147 varyheaders = []
172 148 # Important: don't use self.capable() here or else you end up
173 149 # with infinite recursion when trying to look up capabilities
174 150 # for the first time.
175 151 postargsok = self.caps is not None and 'httppostargs' in self.caps
176 152 # TODO: support for httppostargs when data is a file-like
177 153 # object rather than a basestring
178 154 canmungedata = not data or isinstance(data, basestring)
179 155 if postargsok and canmungedata:
180 156 strargs = urlreq.urlencode(sorted(args.items()))
181 157 if strargs:
182 158 if not data:
183 159 data = strargs
184 160 elif isinstance(data, basestring):
185 161 data = strargs + data
186 162 headers['X-HgArgs-Post'] = len(strargs)
187 163 else:
188 164 if len(args) > 0:
189 165 httpheader = self.capable('httpheader')
190 166 if httpheader:
191 167 headersize = int(httpheader.split(',', 1)[0])
192 168 if headersize > 0:
193 169 # The headers can typically carry more data than the URL.
194 170 encargs = urlreq.urlencode(sorted(args.items()))
195 171 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
196 172 headersize):
197 173 headers[header] = value
198 174 varyheaders.append(header)
199 175 else:
200 176 q += sorted(args.items())
201 177 qs = '?%s' % urlreq.urlencode(q)
202 178 cu = "%s%s" % (self._url, qs)
203 179 size = 0
204 180 if util.safehasattr(data, 'length'):
205 181 size = data.length
206 182 elif data is not None:
207 183 size = len(data)
208 184 if size and self.ui.configbool('ui', 'usehttp2', False):
209 185 headers['Expect'] = '100-Continue'
210 186 headers['X-HgHttp2'] = '1'
211 187 if data is not None and 'Content-Type' not in headers:
212 188 headers['Content-Type'] = 'application/mercurial-0.1'
213 189
214 190 # Tell the server we accept application/mercurial-0.2 and multiple
215 191 # compression formats if the server is capable of emitting those
216 192 # payloads.
217 193 protoparams = []
218 194
219 195 mediatypes = set()
220 196 if self.caps is not None:
221 197 mt = self.capable('httpmediatype')
222 198 if mt:
223 199 protoparams.append('0.1')
224 200 mediatypes = set(mt.split(','))
225 201
226 202 if '0.2tx' in mediatypes:
227 203 protoparams.append('0.2')
228 204
229 205 if '0.2tx' in mediatypes and self.capable('compression'):
230 206 # We /could/ compare supported compression formats and prune
231 207 # non-mutually supported or error if nothing is mutually supported.
232 208 # For now, send the full list to the server and have it error.
233 209 comps = [e.wireprotosupport().name for e in
234 210 util.compengines.supportedwireengines(util.CLIENTROLE)]
235 211 protoparams.append('comp=%s' % ','.join(comps))
236 212
237 213 if protoparams:
238 214 protoheaders = encodevalueinheaders(' '.join(protoparams),
239 215 'X-HgProto',
240 216 headersize or 1024)
241 217 for header, value in protoheaders:
242 218 headers[header] = value
243 219 varyheaders.append(header)
244 220
245 221 headers['Vary'] = ','.join(varyheaders)
246 222 req = self.requestbuilder(cu, data, headers)
247 223
248 224 if data is not None:
249 225 self.ui.debug("sending %s bytes\n" % size)
250 226 req.add_unredirected_header('Content-Length', '%d' % size)
251 227 try:
252 228 resp = self.urlopener.open(req)
253 229 except urlerr.httperror as inst:
254 230 if inst.code == 401:
255 231 raise error.Abort(_('authorization failed'))
256 232 raise
257 233 except httplib.HTTPException as inst:
258 234 self.ui.debug('http error while sending %s command\n' % cmd)
259 235 self.ui.traceback()
260 236 raise IOError(None, inst)
261 237
262 238 # Insert error handlers for common I/O failures.
263 239 _wraphttpresponse(resp)
264 240
265 241 # record the url we got redirected to
266 242 resp_url = resp.geturl()
267 243 if resp_url.endswith(qs):
268 244 resp_url = resp_url[:-len(qs)]
269 245 if self._url.rstrip('/') != resp_url.rstrip('/'):
270 246 if not self.ui.quiet:
271 247 self.ui.warn(_('real URL is %s\n') % resp_url)
272 248 self._url = resp_url
273 249 try:
274 250 proto = resp.getheader('content-type')
275 251 except AttributeError:
276 252 proto = resp.headers.get('content-type', '')
277 253
278 254 safeurl = util.hidepassword(self._url)
279 255 if proto.startswith('application/hg-error'):
280 256 raise error.OutOfBandError(resp.read())
281 257 # accept old "text/plain" and "application/hg-changegroup" for now
282 258 if not (proto.startswith('application/mercurial-') or
283 259 (proto.startswith('text/plain')
284 260 and not resp.headers.get('content-length')) or
285 261 proto.startswith('application/hg-changegroup')):
286 262 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
287 263 raise error.RepoError(
288 264 _("'%s' does not appear to be an hg repository:\n"
289 265 "---%%<--- (%s)\n%s\n---%%<---\n")
290 266 % (safeurl, proto or 'no content-type', resp.read(1024)))
291 267
292 268 if proto.startswith('application/mercurial-'):
293 269 try:
294 270 version = proto.split('-', 1)[1]
295 271 version_info = tuple([int(n) for n in version.split('.')])
296 272 except ValueError:
297 273 raise error.RepoError(_("'%s' sent a broken Content-Type "
298 274 "header (%s)") % (safeurl, proto))
299 275
276 # TODO consider switching to a decompression reader that uses
277 # generators.
300 278 if version_info == (0, 1):
301 279 if _compressible:
302 return decompressresponse(resp, util.compengines['zlib'])
280 return util.compengines['zlib'].decompressorreader(resp)
303 281 return resp
304 282 elif version_info == (0, 2):
305 283 # application/mercurial-0.2 always identifies the compression
306 284 # engine in the payload header.
307 285 elen = struct.unpack('B', resp.read(1))[0]
308 286 ename = resp.read(elen)
309 287 engine = util.compengines.forwiretype(ename)
310 return decompressresponse(resp, engine)
288 return engine.decompressorreader(resp)
311 289 else:
312 290 raise error.RepoError(_("'%s' uses newer protocol %s") %
313 291 (safeurl, version))
314 292
315 293 if _compressible:
316 return decompressresponse(resp, util.compengines['zlib'])
294 return util.compengines['zlib'].decompressorreader(resp)
317 295
318 296 return resp
319 297
320 298 def _call(self, cmd, **args):
321 299 fp = self._callstream(cmd, **args)
322 300 try:
323 301 return fp.read()
324 302 finally:
325 303 # if using keepalive, allow connection to be reused
326 304 fp.close()
327 305
328 306 def _callpush(self, cmd, cg, **args):
329 307 # have to stream bundle to a temp file because we do not have
330 308 # http 1.1 chunked transfer.
331 309
332 310 types = self.capable('unbundle')
333 311 try:
334 312 types = types.split(',')
335 313 except AttributeError:
336 314 # servers older than d1b16a746db6 will send 'unbundle' as a
337 315 # boolean capability. They only support headerless/uncompressed
338 316 # bundles.
339 317 types = [""]
340 318 for x in types:
341 319 if x in bundle2.bundletypes:
342 320 type = x
343 321 break
344 322
345 323 tempname = bundle2.writebundle(self.ui, cg, None, type)
346 324 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
347 325 headers = {'Content-Type': 'application/mercurial-0.1'}
348 326
349 327 try:
350 328 r = self._call(cmd, data=fp, headers=headers, **args)
351 329 vals = r.split('\n', 1)
352 330 if len(vals) < 2:
353 331 raise error.ResponseError(_("unexpected response:"), r)
354 332 return vals
355 333 except socket.error as err:
356 334 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
357 335 raise error.Abort(_('push failed: %s') % err.args[1])
358 336 raise error.Abort(err.args[1])
359 337 finally:
360 338 fp.close()
361 339 os.unlink(tempname)
362 340
363 341 def _calltwowaystream(self, cmd, fp, **args):
364 342 fh = None
365 343 fp_ = None
366 344 filename = None
367 345 try:
368 346 # dump bundle to disk
369 347 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
370 348 fh = os.fdopen(fd, pycompat.sysstr("wb"))
371 349 d = fp.read(4096)
372 350 while d:
373 351 fh.write(d)
374 352 d = fp.read(4096)
375 353 fh.close()
376 354 # start http push
377 355 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
378 356 headers = {'Content-Type': 'application/mercurial-0.1'}
379 357 return self._callstream(cmd, data=fp_, headers=headers, **args)
380 358 finally:
381 359 if fp_ is not None:
382 360 fp_.close()
383 361 if fh is not None:
384 362 fh.close()
385 363 os.unlink(filename)
386 364
387 365 def _callcompressable(self, cmd, **args):
388 366 return self._callstream(cmd, _compressible=True, **args)
389 367
390 368 def _abort(self, exception):
391 369 raise exception
392 370
393 371 class httpspeer(httppeer):
394 372 def __init__(self, ui, path):
395 373 if not url.has_https:
396 374 raise error.Abort(_('Python support for SSL and HTTPS '
397 375 'is not installed'))
398 376 httppeer.__init__(self, ui, path)
399 377
400 378 def instance(ui, path, create):
401 379 if create:
402 380 raise error.Abort(_('cannot create new http repository'))
403 381 try:
404 382 if path.startswith('https:'):
405 383 inst = httpspeer(ui, path)
406 384 else:
407 385 inst = httppeer(ui, path)
408 386 try:
409 387 # Try to do useful work when checking compatibility.
410 388 # Usually saves a roundtrip since we want the caps anyway.
411 389 inst._fetchcaps()
412 390 except error.RepoError:
413 391 # No luck, try older compatibility check.
414 392 inst.between([(nullid, nullid)])
415 393 return inst
416 394 except error.RepoError as httpexception:
417 395 try:
418 396 r = statichttprepo.instance(ui, "static-" + path, create)
419 397 ui.note(_('(falling back to static-http)\n'))
420 398 return r
421 399 except error.RepoError:
422 400 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now