##// END OF EJS Templates
httppeer: move error handling and response wrapping into sendrequest...
Gregory Szorc -
r37568:b5862ee0 default
parent child Browse files
Show More
@@ -1,633 +1,635 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17
18 18 from .i18n import _
19 19 from .thirdparty import (
20 20 cbor,
21 21 )
22 22 from . import (
23 23 bundle2,
24 24 error,
25 25 httpconnection,
26 26 pycompat,
27 27 statichttprepo,
28 28 url as urlmod,
29 29 util,
30 30 wireproto,
31 31 wireprotoframing,
32 32 wireprotov2server,
33 33 )
34 34
35 35 httplib = util.httplib
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 def encodevalueinheaders(value, header, limit):
40 40 """Encode a string value into multiple HTTP headers.
41 41
42 42 ``value`` will be encoded into 1 or more HTTP headers with the names
43 43 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
44 44 name + value will be at most ``limit`` bytes long.
45 45
46 46 Returns an iterable of 2-tuples consisting of header names and
47 47 values as native strings.
48 48 """
49 49 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
50 50 # not bytes. This function always takes bytes in as arguments.
51 51 fmt = pycompat.strurl(header) + r'-%s'
52 52 # Note: it is *NOT* a bug that the last bit here is a bytestring
53 53 # and not a unicode: we're just getting the encoded length anyway,
54 54 # and using an r-string to make it portable between Python 2 and 3
55 55 # doesn't work because then the \r is a literal backslash-r
56 56 # instead of a carriage return.
57 57 valuelen = limit - len(fmt % r'000') - len(': \r\n')
58 58 result = []
59 59
60 60 n = 0
61 61 for i in xrange(0, len(value), valuelen):
62 62 n += 1
63 63 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
64 64
65 65 return result
66 66
67 67 def _wraphttpresponse(resp):
68 68 """Wrap an HTTPResponse with common error handlers.
69 69
70 70 This ensures that any I/O from any consumer raises the appropriate
71 71 error and messaging.
72 72 """
73 73 origread = resp.read
74 74
75 75 class readerproxy(resp.__class__):
76 76 def read(self, size=None):
77 77 try:
78 78 return origread(size)
79 79 except httplib.IncompleteRead as e:
80 80 # e.expected is an integer if length known or None otherwise.
81 81 if e.expected:
82 82 msg = _('HTTP request error (incomplete response; '
83 83 'expected %d bytes got %d)') % (e.expected,
84 84 len(e.partial))
85 85 else:
86 86 msg = _('HTTP request error (incomplete response)')
87 87
88 88 raise error.PeerTransportError(
89 89 msg,
90 90 hint=_('this may be an intermittent network failure; '
91 91 'if the error persists, consider contacting the '
92 92 'network or server operator'))
93 93 except httplib.HTTPException as e:
94 94 raise error.PeerTransportError(
95 95 _('HTTP request error (%s)') % e,
96 96 hint=_('this may be an intermittent network failure; '
97 97 'if the error persists, consider contacting the '
98 98 'network or server operator'))
99 99
100 100 resp.__class__ = readerproxy
101 101
102 102 class _multifile(object):
103 103 def __init__(self, *fileobjs):
104 104 for f in fileobjs:
105 105 if not util.safehasattr(f, 'length'):
106 106 raise ValueError(
107 107 '_multifile only supports file objects that '
108 108 'have a length but this one does not:', type(f), f)
109 109 self._fileobjs = fileobjs
110 110 self._index = 0
111 111
112 112 @property
113 113 def length(self):
114 114 return sum(f.length for f in self._fileobjs)
115 115
116 116 def read(self, amt=None):
117 117 if amt <= 0:
118 118 return ''.join(f.read() for f in self._fileobjs)
119 119 parts = []
120 120 while amt and self._index < len(self._fileobjs):
121 121 parts.append(self._fileobjs[self._index].read(amt))
122 122 got = len(parts[-1])
123 123 if got < amt:
124 124 self._index += 1
125 125 amt -= got
126 126 return ''.join(parts)
127 127
128 128 def seek(self, offset, whence=os.SEEK_SET):
129 129 if whence != os.SEEK_SET:
130 130 raise NotImplementedError(
131 131 '_multifile does not support anything other'
132 132 ' than os.SEEK_SET for whence on seek()')
133 133 if offset != 0:
134 134 raise NotImplementedError(
135 135 '_multifile only supports seeking to start, but that '
136 136 'could be fixed if you need it')
137 137 for f in self._fileobjs:
138 138 f.seek(0)
139 139 self._index = 0
140 140
141 141 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
142 142 repobaseurl, cmd, args):
143 143 """Make an HTTP request to run a command for a version 1 client.
144 144
145 145 ``caps`` is a set of known server capabilities. The value may be
146 146 None if capabilities are not yet known.
147 147
148 148 ``capablefn`` is a function to evaluate a capability.
149 149
150 150 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
151 151 raw data to pass to it.
152 152 """
153 153 if cmd == 'pushkey':
154 154 args['data'] = ''
155 155 data = args.pop('data', None)
156 156 headers = args.pop('headers', {})
157 157
158 158 ui.debug("sending %s command\n" % cmd)
159 159 q = [('cmd', cmd)]
160 160 headersize = 0
161 161 varyheaders = []
162 162 # Important: don't use self.capable() here or else you end up
163 163 # with infinite recursion when trying to look up capabilities
164 164 # for the first time.
165 165 postargsok = caps is not None and 'httppostargs' in caps
166 166
167 167 # Send arguments via POST.
168 168 if postargsok and args:
169 169 strargs = urlreq.urlencode(sorted(args.items()))
170 170 if not data:
171 171 data = strargs
172 172 else:
173 173 if isinstance(data, bytes):
174 174 i = io.BytesIO(data)
175 175 i.length = len(data)
176 176 data = i
177 177 argsio = io.BytesIO(strargs)
178 178 argsio.length = len(strargs)
179 179 data = _multifile(argsio, data)
180 180 headers[r'X-HgArgs-Post'] = len(strargs)
181 181 elif args:
182 182 # Calling self.capable() can infinite loop if we are calling
183 183 # "capabilities". But that command should never accept wire
184 184 # protocol arguments. So this should never happen.
185 185 assert cmd != 'capabilities'
186 186 httpheader = capablefn('httpheader')
187 187 if httpheader:
188 188 headersize = int(httpheader.split(',', 1)[0])
189 189
190 190 # Send arguments via HTTP headers.
191 191 if headersize > 0:
192 192 # The headers can typically carry more data than the URL.
193 193 encargs = urlreq.urlencode(sorted(args.items()))
194 194 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
195 195 headersize):
196 196 headers[header] = value
197 197 varyheaders.append(header)
198 198 # Send arguments via query string (Mercurial <1.9).
199 199 else:
200 200 q += sorted(args.items())
201 201
202 202 qs = '?%s' % urlreq.urlencode(q)
203 203 cu = "%s%s" % (repobaseurl, qs)
204 204 size = 0
205 205 if util.safehasattr(data, 'length'):
206 206 size = data.length
207 207 elif data is not None:
208 208 size = len(data)
209 209 if data is not None and r'Content-Type' not in headers:
210 210 headers[r'Content-Type'] = r'application/mercurial-0.1'
211 211
212 212 # Tell the server we accept application/mercurial-0.2 and multiple
213 213 # compression formats if the server is capable of emitting those
214 214 # payloads.
215 215 protoparams = {'partial-pull'}
216 216
217 217 mediatypes = set()
218 218 if caps is not None:
219 219 mt = capablefn('httpmediatype')
220 220 if mt:
221 221 protoparams.add('0.1')
222 222 mediatypes = set(mt.split(','))
223 223
224 224 if '0.2tx' in mediatypes:
225 225 protoparams.add('0.2')
226 226
227 227 if '0.2tx' in mediatypes and capablefn('compression'):
228 228 # We /could/ compare supported compression formats and prune
229 229 # non-mutually supported or error if nothing is mutually supported.
230 230 # For now, send the full list to the server and have it error.
231 231 comps = [e.wireprotosupport().name for e in
232 232 util.compengines.supportedwireengines(util.CLIENTROLE)]
233 233 protoparams.add('comp=%s' % ','.join(comps))
234 234
235 235 if protoparams:
236 236 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
237 237 'X-HgProto',
238 238 headersize or 1024)
239 239 for header, value in protoheaders:
240 240 headers[header] = value
241 241 varyheaders.append(header)
242 242
243 243 if varyheaders:
244 244 headers[r'Vary'] = r','.join(varyheaders)
245 245
246 246 req = requestbuilder(pycompat.strurl(cu), data, headers)
247 247
248 248 if data is not None:
249 249 ui.debug("sending %d bytes\n" % size)
250 250 req.add_unredirected_header(r'Content-Length', r'%d' % size)
251 251
252 252 return req, cu, qs
253 253
254 254 def sendrequest(ui, opener, req):
255 255 """Send a prepared HTTP request.
256 256
257 257 Returns the response object.
258 258 """
259 259 if (ui.debugflag
260 260 and ui.configbool('devel', 'debug.peer-request')):
261 261 dbg = ui.debug
262 262 line = 'devel-peer-request: %s\n'
263 263 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
264 264 hgargssize = None
265 265
266 266 for header, value in sorted(req.header_items()):
267 267 if header.startswith('X-hgarg-'):
268 268 if hgargssize is None:
269 269 hgargssize = 0
270 270 hgargssize += len(value)
271 271 else:
272 272 dbg(line % ' %s %s' % (header, value))
273 273
274 274 if hgargssize is not None:
275 275 dbg(line % ' %d bytes of commands arguments in headers'
276 276 % hgargssize)
277 277
278 278 if req.has_data():
279 279 data = req.get_data()
280 280 length = getattr(data, 'length', None)
281 281 if length is None:
282 282 length = len(data)
283 283 dbg(line % ' %d bytes of data' % length)
284 284
285 285 start = util.timer()
286 286
287 res = opener.open(req)
288 if ui.configbool('devel', 'debug.peer-request'):
289 dbg(line % ' finished in %.4f seconds (%s)'
290 % (util.timer() - start, res.code))
287 try:
288 res = opener.open(req)
289 except urlerr.httperror as inst:
290 if inst.code == 401:
291 raise error.Abort(_('authorization failed'))
292 raise
293 except httplib.HTTPException as inst:
294 ui.debug('http error requesting %s\n' %
295 util.hidepassword(req.get_full_url()))
296 ui.traceback()
297 raise IOError(None, inst)
298 finally:
299 if ui.configbool('devel', 'debug.peer-request'):
300 dbg(line % ' finished in %.4f seconds (%s)'
301 % (util.timer() - start, res.code))
302
303 # Insert error handlers for common I/O failures.
304 _wraphttpresponse(res)
291 305
292 306 return res
293 307
294 308 class httppeer(wireproto.wirepeer):
295 309 def __init__(self, ui, path, url, opener, requestbuilder):
296 310 self.ui = ui
297 311 self._path = path
298 312 self._url = url
299 313 self._caps = None
300 314 self._urlopener = opener
301 315 self._requestbuilder = requestbuilder
302 316
303 317 def __del__(self):
304 318 for h in self._urlopener.handlers:
305 319 h.close()
306 320 getattr(h, "close_all", lambda: None)()
307 321
308 322 # Begin of ipeerconnection interface.
309 323
310 324 def url(self):
311 325 return self._path
312 326
313 327 def local(self):
314 328 return None
315 329
316 330 def peer(self):
317 331 return self
318 332
319 333 def canpush(self):
320 334 return True
321 335
322 336 def close(self):
323 337 pass
324 338
325 339 # End of ipeerconnection interface.
326 340
327 341 # Begin of ipeercommands interface.
328 342
329 343 def capabilities(self):
330 344 # self._fetchcaps() should have been called as part of peer
331 345 # handshake. So self._caps should always be set.
332 346 assert self._caps is not None
333 347 return self._caps
334 348
335 349 # End of ipeercommands interface.
336 350
337 351 # look up capabilities only when needed
338 352
339 353 def _fetchcaps(self):
340 354 self._caps = set(self._call('capabilities').split())
341 355
342 356 def _callstream(self, cmd, _compressible=False, **args):
343 357 args = pycompat.byteskwargs(args)
344 358
345 359 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
346 360 self._caps, self.capable,
347 361 self._url, cmd, args)
348 362
349 try:
350 resp = sendrequest(self.ui, self._urlopener, req)
351 except urlerr.httperror as inst:
352 if inst.code == 401:
353 raise error.Abort(_('authorization failed'))
354 raise
355 except httplib.HTTPException as inst:
356 self.ui.debug('http error while sending %s command\n' % cmd)
357 self.ui.traceback()
358 raise IOError(None, inst)
359
360 # Insert error handlers for common I/O failures.
361 _wraphttpresponse(resp)
363 resp = sendrequest(self.ui, self._urlopener, req)
362 364
363 365 # record the url we got redirected to
364 366 resp_url = pycompat.bytesurl(resp.geturl())
365 367 if resp_url.endswith(qs):
366 368 resp_url = resp_url[:-len(qs)]
367 369 if self._url.rstrip('/') != resp_url.rstrip('/'):
368 370 if not self.ui.quiet:
369 371 self.ui.warn(_('real URL is %s\n') % resp_url)
370 372 self._url = resp_url
371 373 try:
372 374 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
373 375 except AttributeError:
374 376 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
375 377
376 378 safeurl = util.hidepassword(self._url)
377 379 if proto.startswith('application/hg-error'):
378 380 raise error.OutOfBandError(resp.read())
379 381 # accept old "text/plain" and "application/hg-changegroup" for now
380 382 if not (proto.startswith('application/mercurial-') or
381 383 (proto.startswith('text/plain')
382 384 and not resp.headers.get('content-length')) or
383 385 proto.startswith('application/hg-changegroup')):
384 386 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
385 387 raise error.RepoError(
386 388 _("'%s' does not appear to be an hg repository:\n"
387 389 "---%%<--- (%s)\n%s\n---%%<---\n")
388 390 % (safeurl, proto or 'no content-type', resp.read(1024)))
389 391
390 392 if proto.startswith('application/mercurial-'):
391 393 try:
392 394 version = proto.split('-', 1)[1]
393 395 version_info = tuple([int(n) for n in version.split('.')])
394 396 except ValueError:
395 397 raise error.RepoError(_("'%s' sent a broken Content-Type "
396 398 "header (%s)") % (safeurl, proto))
397 399
398 400 # TODO consider switching to a decompression reader that uses
399 401 # generators.
400 402 if version_info == (0, 1):
401 403 if _compressible:
402 404 return util.compengines['zlib'].decompressorreader(resp)
403 405 return resp
404 406 elif version_info == (0, 2):
405 407 # application/mercurial-0.2 always identifies the compression
406 408 # engine in the payload header.
407 409 elen = struct.unpack('B', resp.read(1))[0]
408 410 ename = resp.read(elen)
409 411 engine = util.compengines.forwiretype(ename)
410 412 return engine.decompressorreader(resp)
411 413 else:
412 414 raise error.RepoError(_("'%s' uses newer protocol %s") %
413 415 (safeurl, version))
414 416
415 417 if _compressible:
416 418 return util.compengines['zlib'].decompressorreader(resp)
417 419
418 420 return resp
419 421
420 422 def _call(self, cmd, **args):
421 423 fp = self._callstream(cmd, **args)
422 424 try:
423 425 return fp.read()
424 426 finally:
425 427 # if using keepalive, allow connection to be reused
426 428 fp.close()
427 429
428 430 def _callpush(self, cmd, cg, **args):
429 431 # have to stream bundle to a temp file because we do not have
430 432 # http 1.1 chunked transfer.
431 433
432 434 types = self.capable('unbundle')
433 435 try:
434 436 types = types.split(',')
435 437 except AttributeError:
436 438 # servers older than d1b16a746db6 will send 'unbundle' as a
437 439 # boolean capability. They only support headerless/uncompressed
438 440 # bundles.
439 441 types = [""]
440 442 for x in types:
441 443 if x in bundle2.bundletypes:
442 444 type = x
443 445 break
444 446
445 447 tempname = bundle2.writebundle(self.ui, cg, None, type)
446 448 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
447 449 headers = {r'Content-Type': r'application/mercurial-0.1'}
448 450
449 451 try:
450 452 r = self._call(cmd, data=fp, headers=headers, **args)
451 453 vals = r.split('\n', 1)
452 454 if len(vals) < 2:
453 455 raise error.ResponseError(_("unexpected response:"), r)
454 456 return vals
455 457 except urlerr.httperror:
456 458 # Catch and re-raise these so we don't try and treat them
457 459 # like generic socket errors. They lack any values in
458 460 # .args on Python 3 which breaks our socket.error block.
459 461 raise
460 462 except socket.error as err:
461 463 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
462 464 raise error.Abort(_('push failed: %s') % err.args[1])
463 465 raise error.Abort(err.args[1])
464 466 finally:
465 467 fp.close()
466 468 os.unlink(tempname)
467 469
468 470 def _calltwowaystream(self, cmd, fp, **args):
469 471 fh = None
470 472 fp_ = None
471 473 filename = None
472 474 try:
473 475 # dump bundle to disk
474 476 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
475 477 fh = os.fdopen(fd, r"wb")
476 478 d = fp.read(4096)
477 479 while d:
478 480 fh.write(d)
479 481 d = fp.read(4096)
480 482 fh.close()
481 483 # start http push
482 484 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
483 485 headers = {r'Content-Type': r'application/mercurial-0.1'}
484 486 return self._callstream(cmd, data=fp_, headers=headers, **args)
485 487 finally:
486 488 if fp_ is not None:
487 489 fp_.close()
488 490 if fh is not None:
489 491 fh.close()
490 492 os.unlink(filename)
491 493
492 494 def _callcompressable(self, cmd, **args):
493 495 return self._callstream(cmd, _compressible=True, **args)
494 496
495 497 def _abort(self, exception):
496 498 raise exception
497 499
498 500 # TODO implement interface for version 2 peers
499 501 class httpv2peer(object):
500 502 def __init__(self, ui, repourl, opener):
501 503 self.ui = ui
502 504
503 505 if repourl.endswith('/'):
504 506 repourl = repourl[:-1]
505 507
506 508 self.url = repourl
507 509 self._opener = opener
508 510 # This is an its own attribute to facilitate extensions overriding
509 511 # the default type.
510 512 self._requestbuilder = urlreq.request
511 513
512 514 def close(self):
513 515 pass
514 516
515 517 # TODO require to be part of a batched primitive, use futures.
516 518 def _call(self, name, **args):
517 519 """Call a wire protocol command with arguments."""
518 520
519 521 # Having this early has a side-effect of importing wireprotov2server,
520 522 # which has the side-effect of ensuring commands are registered.
521 523
522 524 # TODO modify user-agent to reflect v2.
523 525 headers = {
524 526 r'Accept': wireprotov2server.FRAMINGTYPE,
525 527 r'Content-Type': wireprotov2server.FRAMINGTYPE,
526 528 }
527 529
528 530 # TODO permissions should come from capabilities results.
529 531 permission = wireproto.commandsv2[name].permission
530 532 if permission not in ('push', 'pull'):
531 533 raise error.ProgrammingError('unknown permission type: %s' %
532 534 permission)
533 535
534 536 permission = {
535 537 'push': 'rw',
536 538 'pull': 'ro',
537 539 }[permission]
538 540
539 541 url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
540 542 permission, name)
541 543
542 544 # TODO this should be part of a generic peer for the frame-based
543 545 # protocol.
544 546 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
545 547 buffersends=True)
546 548
547 549 request, action, meta = reactor.callcommand(name, args)
548 550 assert action == 'noop'
549 551
550 552 action, meta = reactor.flushcommands()
551 553 assert action == 'sendframes'
552 554
553 555 body = b''.join(map(bytes, meta['framegen']))
554 556 req = self._requestbuilder(pycompat.strurl(url), body, headers)
555 557 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
556 558
557 559 # TODO unify this code with httppeer.
558 560 try:
559 561 res = self._opener.open(req)
560 562 except urlerr.httperror as e:
561 563 if e.code == 401:
562 564 raise error.Abort(_('authorization failed'))
563 565
564 566 raise
565 567 except httplib.HTTPException as e:
566 568 self.ui.traceback()
567 569 raise IOError(None, e)
568 570
569 571 # TODO validate response type, wrap response to handle I/O errors.
570 572 # TODO more robust frame receiver.
571 573 results = []
572 574
573 575 while True:
574 576 frame = wireprotoframing.readframe(res)
575 577 if frame is None:
576 578 break
577 579
578 580 self.ui.note(_('received %r\n') % frame)
579 581
580 582 action, meta = reactor.onframerecv(frame)
581 583
582 584 if action == 'responsedata':
583 585 if meta['cbor']:
584 586 payload = util.bytesio(meta['data'])
585 587
586 588 decoder = cbor.CBORDecoder(payload)
587 589 while payload.tell() + 1 < len(meta['data']):
588 590 results.append(decoder.decode())
589 591 else:
590 592 results.append(meta['data'])
591 593 else:
592 594 error.ProgrammingError('unhandled action: %s' % action)
593 595
594 596 return results
595 597
596 598 def makepeer(ui, path, requestbuilder=urlreq.request):
597 599 """Construct an appropriate HTTP peer instance.
598 600
599 601 ``requestbuilder`` is the type used for constructing HTTP requests.
600 602 It exists as an argument so extensions can override the default.
601 603 """
602 604 u = util.url(path)
603 605 if u.query or u.fragment:
604 606 raise error.Abort(_('unsupported URL component: "%s"') %
605 607 (u.query or u.fragment))
606 608
607 609 # urllib cannot handle URLs with embedded user or passwd.
608 610 url, authinfo = u.authinfo()
609 611 ui.debug('using %s\n' % url)
610 612
611 613 opener = urlmod.opener(ui, authinfo)
612 614
613 615 return httppeer(ui, path, url, opener, requestbuilder)
614 616
615 617 def instance(ui, path, create):
616 618 if create:
617 619 raise error.Abort(_('cannot create new http repository'))
618 620 try:
619 621 if path.startswith('https:') and not urlmod.has_https:
620 622 raise error.Abort(_('Python support for SSL and HTTPS '
621 623 'is not installed'))
622 624
623 625 inst = makepeer(ui, path)
624 626 inst._fetchcaps()
625 627
626 628 return inst
627 629 except error.RepoError as httpexception:
628 630 try:
629 631 r = statichttprepo.instance(ui, "static-" + path, create)
630 632 ui.note(_('(falling back to static-http)\n'))
631 633 return r
632 634 except error.RepoError:
633 635 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now