##// END OF EJS Templates
httppeer: handle error response from client reactor...
Gregory Szorc -
r37672:8cea0d57 default
parent child Browse files
Show More
@@ -1,1001 +1,1009 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import sys
17 17 import tempfile
18 18 import weakref
19 19
20 20 from .i18n import _
21 21 from .thirdparty import (
22 22 cbor,
23 23 )
24 24 from .thirdparty.zope import (
25 25 interface as zi,
26 26 )
27 27 from . import (
28 28 bundle2,
29 29 error,
30 30 httpconnection,
31 31 pycompat,
32 32 repository,
33 33 statichttprepo,
34 34 url as urlmod,
35 35 util,
36 36 wireprotoframing,
37 37 wireprototypes,
38 38 wireprotov1peer,
39 39 wireprotov2server,
40 40 )
41 41
42 42 httplib = util.httplib
43 43 urlerr = util.urlerr
44 44 urlreq = util.urlreq
45 45
46 46 def encodevalueinheaders(value, header, limit):
47 47 """Encode a string value into multiple HTTP headers.
48 48
49 49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 51 name + value will be at most ``limit`` bytes long.
52 52
53 53 Returns an iterable of 2-tuples consisting of header names and
54 54 values as native strings.
55 55 """
56 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 57 # not bytes. This function always takes bytes in as arguments.
58 58 fmt = pycompat.strurl(header) + r'-%s'
59 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 60 # and not a unicode: we're just getting the encoded length anyway,
61 61 # and using an r-string to make it portable between Python 2 and 3
62 62 # doesn't work because then the \r is a literal backslash-r
63 63 # instead of a carriage return.
64 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 65 result = []
66 66
67 67 n = 0
68 68 for i in xrange(0, len(value), valuelen):
69 69 n += 1
70 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 71
72 72 return result
73 73
74 74 def _wraphttpresponse(resp):
75 75 """Wrap an HTTPResponse with common error handlers.
76 76
77 77 This ensures that any I/O from any consumer raises the appropriate
78 78 error and messaging.
79 79 """
80 80 origread = resp.read
81 81
82 82 class readerproxy(resp.__class__):
83 83 def read(self, size=None):
84 84 try:
85 85 return origread(size)
86 86 except httplib.IncompleteRead as e:
87 87 # e.expected is an integer if length known or None otherwise.
88 88 if e.expected:
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (e.expected,
91 91 len(e.partial))
92 92 else:
93 93 msg = _('HTTP request error (incomplete response)')
94 94
95 95 raise error.PeerTransportError(
96 96 msg,
97 97 hint=_('this may be an intermittent network failure; '
98 98 'if the error persists, consider contacting the '
99 99 'network or server operator'))
100 100 except httplib.HTTPException as e:
101 101 raise error.PeerTransportError(
102 102 _('HTTP request error (%s)') % e,
103 103 hint=_('this may be an intermittent network failure; '
104 104 'if the error persists, consider contacting the '
105 105 'network or server operator'))
106 106
107 107 resp.__class__ = readerproxy
108 108
109 109 class _multifile(object):
110 110 def __init__(self, *fileobjs):
111 111 for f in fileobjs:
112 112 if not util.safehasattr(f, 'length'):
113 113 raise ValueError(
114 114 '_multifile only supports file objects that '
115 115 'have a length but this one does not:', type(f), f)
116 116 self._fileobjs = fileobjs
117 117 self._index = 0
118 118
119 119 @property
120 120 def length(self):
121 121 return sum(f.length for f in self._fileobjs)
122 122
123 123 def read(self, amt=None):
124 124 if amt <= 0:
125 125 return ''.join(f.read() for f in self._fileobjs)
126 126 parts = []
127 127 while amt and self._index < len(self._fileobjs):
128 128 parts.append(self._fileobjs[self._index].read(amt))
129 129 got = len(parts[-1])
130 130 if got < amt:
131 131 self._index += 1
132 132 amt -= got
133 133 return ''.join(parts)
134 134
135 135 def seek(self, offset, whence=os.SEEK_SET):
136 136 if whence != os.SEEK_SET:
137 137 raise NotImplementedError(
138 138 '_multifile does not support anything other'
139 139 ' than os.SEEK_SET for whence on seek()')
140 140 if offset != 0:
141 141 raise NotImplementedError(
142 142 '_multifile only supports seeking to start, but that '
143 143 'could be fixed if you need it')
144 144 for f in self._fileobjs:
145 145 f.seek(0)
146 146 self._index = 0
147 147
148 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 149 repobaseurl, cmd, args):
150 150 """Make an HTTP request to run a command for a version 1 client.
151 151
152 152 ``caps`` is a set of known server capabilities. The value may be
153 153 None if capabilities are not yet known.
154 154
155 155 ``capablefn`` is a function to evaluate a capability.
156 156
157 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 158 raw data to pass to it.
159 159 """
160 160 if cmd == 'pushkey':
161 161 args['data'] = ''
162 162 data = args.pop('data', None)
163 163 headers = args.pop('headers', {})
164 164
165 165 ui.debug("sending %s command\n" % cmd)
166 166 q = [('cmd', cmd)]
167 167 headersize = 0
168 168 # Important: don't use self.capable() here or else you end up
169 169 # with infinite recursion when trying to look up capabilities
170 170 # for the first time.
171 171 postargsok = caps is not None and 'httppostargs' in caps
172 172
173 173 # Send arguments via POST.
174 174 if postargsok and args:
175 175 strargs = urlreq.urlencode(sorted(args.items()))
176 176 if not data:
177 177 data = strargs
178 178 else:
179 179 if isinstance(data, bytes):
180 180 i = io.BytesIO(data)
181 181 i.length = len(data)
182 182 data = i
183 183 argsio = io.BytesIO(strargs)
184 184 argsio.length = len(strargs)
185 185 data = _multifile(argsio, data)
186 186 headers[r'X-HgArgs-Post'] = len(strargs)
187 187 elif args:
188 188 # Calling self.capable() can infinite loop if we are calling
189 189 # "capabilities". But that command should never accept wire
190 190 # protocol arguments. So this should never happen.
191 191 assert cmd != 'capabilities'
192 192 httpheader = capablefn('httpheader')
193 193 if httpheader:
194 194 headersize = int(httpheader.split(',', 1)[0])
195 195
196 196 # Send arguments via HTTP headers.
197 197 if headersize > 0:
198 198 # The headers can typically carry more data than the URL.
199 199 encargs = urlreq.urlencode(sorted(args.items()))
200 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 201 headersize):
202 202 headers[header] = value
203 203 # Send arguments via query string (Mercurial <1.9).
204 204 else:
205 205 q += sorted(args.items())
206 206
207 207 qs = '?%s' % urlreq.urlencode(q)
208 208 cu = "%s%s" % (repobaseurl, qs)
209 209 size = 0
210 210 if util.safehasattr(data, 'length'):
211 211 size = data.length
212 212 elif data is not None:
213 213 size = len(data)
214 214 if data is not None and r'Content-Type' not in headers:
215 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216 216
217 217 # Tell the server we accept application/mercurial-0.2 and multiple
218 218 # compression formats if the server is capable of emitting those
219 219 # payloads.
220 220 # Note: Keep this set empty by default, as client advertisement of
221 221 # protocol parameters should only occur after the handshake.
222 222 protoparams = set()
223 223
224 224 mediatypes = set()
225 225 if caps is not None:
226 226 mt = capablefn('httpmediatype')
227 227 if mt:
228 228 protoparams.add('0.1')
229 229 mediatypes = set(mt.split(','))
230 230
231 231 protoparams.add('partial-pull')
232 232
233 233 if '0.2tx' in mediatypes:
234 234 protoparams.add('0.2')
235 235
236 236 if '0.2tx' in mediatypes and capablefn('compression'):
237 237 # We /could/ compare supported compression formats and prune
238 238 # non-mutually supported or error if nothing is mutually supported.
239 239 # For now, send the full list to the server and have it error.
240 240 comps = [e.wireprotosupport().name for e in
241 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 242 protoparams.add('comp=%s' % ','.join(comps))
243 243
244 244 if protoparams:
245 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 246 'X-HgProto',
247 247 headersize or 1024)
248 248 for header, value in protoheaders:
249 249 headers[header] = value
250 250
251 251 varyheaders = []
252 252 for header in headers:
253 253 if header.lower().startswith(r'x-hg'):
254 254 varyheaders.append(header)
255 255
256 256 if varyheaders:
257 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258 258
259 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260 260
261 261 if data is not None:
262 262 ui.debug("sending %d bytes\n" % size)
263 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264 264
265 265 return req, cu, qs
266 266
267 267 def sendrequest(ui, opener, req):
268 268 """Send a prepared HTTP request.
269 269
270 270 Returns the response object.
271 271 """
272 272 if (ui.debugflag
273 273 and ui.configbool('devel', 'debug.peer-request')):
274 274 dbg = ui.debug
275 275 line = 'devel-peer-request: %s\n'
276 276 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
277 277 hgargssize = None
278 278
279 279 for header, value in sorted(req.header_items()):
280 280 if header.startswith('X-hgarg-'):
281 281 if hgargssize is None:
282 282 hgargssize = 0
283 283 hgargssize += len(value)
284 284 else:
285 285 dbg(line % ' %s %s' % (header, value))
286 286
287 287 if hgargssize is not None:
288 288 dbg(line % ' %d bytes of commands arguments in headers'
289 289 % hgargssize)
290 290
291 291 if req.has_data():
292 292 data = req.get_data()
293 293 length = getattr(data, 'length', None)
294 294 if length is None:
295 295 length = len(data)
296 296 dbg(line % ' %d bytes of data' % length)
297 297
298 298 start = util.timer()
299 299
300 300 try:
301 301 res = opener.open(req)
302 302 except urlerr.httperror as inst:
303 303 if inst.code == 401:
304 304 raise error.Abort(_('authorization failed'))
305 305 raise
306 306 except httplib.HTTPException as inst:
307 307 ui.debug('http error requesting %s\n' %
308 308 util.hidepassword(req.get_full_url()))
309 309 ui.traceback()
310 310 raise IOError(None, inst)
311 311 finally:
312 312 if ui.configbool('devel', 'debug.peer-request'):
313 313 dbg(line % ' finished in %.4f seconds (%s)'
314 314 % (util.timer() - start, res.code))
315 315
316 316 # Insert error handlers for common I/O failures.
317 317 _wraphttpresponse(res)
318 318
319 319 return res
320 320
321 321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
322 322 allowcbor=False):
323 323 # record the url we got redirected to
324 324 respurl = pycompat.bytesurl(resp.geturl())
325 325 if respurl.endswith(qs):
326 326 respurl = respurl[:-len(qs)]
327 327 if baseurl.rstrip('/') != respurl.rstrip('/'):
328 328 if not ui.quiet:
329 329 ui.warn(_('real URL is %s\n') % respurl)
330 330
331 331 try:
332 332 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
333 333 except AttributeError:
334 334 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
335 335
336 336 safeurl = util.hidepassword(baseurl)
337 337 if proto.startswith('application/hg-error'):
338 338 raise error.OutOfBandError(resp.read())
339 339
340 340 # Pre 1.0 versions of Mercurial used text/plain and
341 341 # application/hg-changegroup. We don't support such old servers.
342 342 if not proto.startswith('application/mercurial-'):
343 343 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
344 344 raise error.RepoError(
345 345 _("'%s' does not appear to be an hg repository:\n"
346 346 "---%%<--- (%s)\n%s\n---%%<---\n")
347 347 % (safeurl, proto or 'no content-type', resp.read(1024)))
348 348
349 349 try:
350 350 subtype = proto.split('-', 1)[1]
351 351
352 352 # Unless we end up supporting CBOR in the legacy wire protocol,
353 353 # this should ONLY be encountered for the initial capabilities
354 354 # request during handshake.
355 355 if subtype == 'cbor':
356 356 if allowcbor:
357 357 return respurl, proto, resp
358 358 else:
359 359 raise error.RepoError(_('unexpected CBOR response from '
360 360 'server'))
361 361
362 362 version_info = tuple([int(n) for n in subtype.split('.')])
363 363 except ValueError:
364 364 raise error.RepoError(_("'%s' sent a broken Content-Type "
365 365 "header (%s)") % (safeurl, proto))
366 366
367 367 # TODO consider switching to a decompression reader that uses
368 368 # generators.
369 369 if version_info == (0, 1):
370 370 if compressible:
371 371 resp = util.compengines['zlib'].decompressorreader(resp)
372 372
373 373 elif version_info == (0, 2):
374 374 # application/mercurial-0.2 always identifies the compression
375 375 # engine in the payload header.
376 376 elen = struct.unpack('B', resp.read(1))[0]
377 377 ename = resp.read(elen)
378 378 engine = util.compengines.forwiretype(ename)
379 379
380 380 resp = engine.decompressorreader(resp)
381 381 else:
382 382 raise error.RepoError(_("'%s' uses newer protocol %s") %
383 383 (safeurl, subtype))
384 384
385 385 return respurl, proto, resp
386 386
387 387 class httppeer(wireprotov1peer.wirepeer):
388 388 def __init__(self, ui, path, url, opener, requestbuilder, caps):
389 389 self.ui = ui
390 390 self._path = path
391 391 self._url = url
392 392 self._caps = caps
393 393 self._urlopener = opener
394 394 self._requestbuilder = requestbuilder
395 395
396 396 def __del__(self):
397 397 for h in self._urlopener.handlers:
398 398 h.close()
399 399 getattr(h, "close_all", lambda: None)()
400 400
401 401 # Begin of ipeerconnection interface.
402 402
403 403 def url(self):
404 404 return self._path
405 405
406 406 def local(self):
407 407 return None
408 408
409 409 def peer(self):
410 410 return self
411 411
412 412 def canpush(self):
413 413 return True
414 414
415 415 def close(self):
416 416 pass
417 417
418 418 # End of ipeerconnection interface.
419 419
420 420 # Begin of ipeercommands interface.
421 421
422 422 def capabilities(self):
423 423 return self._caps
424 424
425 425 # End of ipeercommands interface.
426 426
427 427 # look up capabilities only when needed
428 428
429 429 def _callstream(self, cmd, _compressible=False, **args):
430 430 args = pycompat.byteskwargs(args)
431 431
432 432 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
433 433 self._caps, self.capable,
434 434 self._url, cmd, args)
435 435
436 436 resp = sendrequest(self.ui, self._urlopener, req)
437 437
438 438 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
439 439 resp, _compressible)
440 440
441 441 return resp
442 442
443 443 def _call(self, cmd, **args):
444 444 fp = self._callstream(cmd, **args)
445 445 try:
446 446 return fp.read()
447 447 finally:
448 448 # if using keepalive, allow connection to be reused
449 449 fp.close()
450 450
451 451 def _callpush(self, cmd, cg, **args):
452 452 # have to stream bundle to a temp file because we do not have
453 453 # http 1.1 chunked transfer.
454 454
455 455 types = self.capable('unbundle')
456 456 try:
457 457 types = types.split(',')
458 458 except AttributeError:
459 459 # servers older than d1b16a746db6 will send 'unbundle' as a
460 460 # boolean capability. They only support headerless/uncompressed
461 461 # bundles.
462 462 types = [""]
463 463 for x in types:
464 464 if x in bundle2.bundletypes:
465 465 type = x
466 466 break
467 467
468 468 tempname = bundle2.writebundle(self.ui, cg, None, type)
469 469 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
470 470 headers = {r'Content-Type': r'application/mercurial-0.1'}
471 471
472 472 try:
473 473 r = self._call(cmd, data=fp, headers=headers, **args)
474 474 vals = r.split('\n', 1)
475 475 if len(vals) < 2:
476 476 raise error.ResponseError(_("unexpected response:"), r)
477 477 return vals
478 478 except urlerr.httperror:
479 479 # Catch and re-raise these so we don't try and treat them
480 480 # like generic socket errors. They lack any values in
481 481 # .args on Python 3 which breaks our socket.error block.
482 482 raise
483 483 except socket.error as err:
484 484 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
485 485 raise error.Abort(_('push failed: %s') % err.args[1])
486 486 raise error.Abort(err.args[1])
487 487 finally:
488 488 fp.close()
489 489 os.unlink(tempname)
490 490
491 491 def _calltwowaystream(self, cmd, fp, **args):
492 492 fh = None
493 493 fp_ = None
494 494 filename = None
495 495 try:
496 496 # dump bundle to disk
497 497 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
498 498 fh = os.fdopen(fd, r"wb")
499 499 d = fp.read(4096)
500 500 while d:
501 501 fh.write(d)
502 502 d = fp.read(4096)
503 503 fh.close()
504 504 # start http push
505 505 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
506 506 headers = {r'Content-Type': r'application/mercurial-0.1'}
507 507 return self._callstream(cmd, data=fp_, headers=headers, **args)
508 508 finally:
509 509 if fp_ is not None:
510 510 fp_.close()
511 511 if fh is not None:
512 512 fh.close()
513 513 os.unlink(filename)
514 514
515 515 def _callcompressable(self, cmd, **args):
516 516 return self._callstream(cmd, _compressible=True, **args)
517 517
518 518 def _abort(self, exception):
519 519 raise exception
520 520
521 521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
522 522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 523 buffersends=True)
524 524
525 525 url = '%s/%s' % (apiurl, permission)
526 526
527 527 if len(requests) > 1:
528 528 url += '/multirequest'
529 529 else:
530 530 url += '/%s' % requests[0][0]
531 531
532 532 # Request ID to (request, future)
533 533 requestmap = {}
534 534
535 535 for command, args, f in requests:
536 536 request, action, meta = reactor.callcommand(command, args)
537 537 assert action == 'noop'
538 538
539 539 requestmap[request.requestid] = (request, f)
540 540
541 541 action, meta = reactor.flushcommands()
542 542 assert action == 'sendframes'
543 543
544 544 # TODO stream this.
545 545 body = b''.join(map(bytes, meta['framegen']))
546 546
547 547 # TODO modify user-agent to reflect v2
548 548 headers = {
549 549 r'Accept': wireprotov2server.FRAMINGTYPE,
550 550 r'Content-Type': wireprotov2server.FRAMINGTYPE,
551 551 }
552 552
553 553 req = requestbuilder(pycompat.strurl(url), body, headers)
554 554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
555 555
556 556 try:
557 557 res = opener.open(req)
558 558 except urlerr.httperror as e:
559 559 if e.code == 401:
560 560 raise error.Abort(_('authorization failed'))
561 561
562 562 raise
563 563 except httplib.HTTPException as e:
564 564 ui.traceback()
565 565 raise IOError(None, e)
566 566
567 567 return reactor, requestmap, res
568 568
569 569 class queuedcommandfuture(pycompat.futures.Future):
570 570 """Wraps result() on command futures to trigger submission on call."""
571 571
572 572 def result(self, timeout=None):
573 573 if self.done():
574 574 return pycompat.futures.Future.result(self, timeout)
575 575
576 576 self._peerexecutor.sendcommands()
577 577
578 578 # sendcommands() will restore the original __class__ and self.result
579 579 # will resolve to Future.result.
580 580 return self.result(timeout)
581 581
582 582 @zi.implementer(repository.ipeercommandexecutor)
583 583 class httpv2executor(object):
584 584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
585 585 self._ui = ui
586 586 self._opener = opener
587 587 self._requestbuilder = requestbuilder
588 588 self._apiurl = apiurl
589 589 self._descriptor = descriptor
590 590 self._sent = False
591 591 self._closed = False
592 592 self._neededpermissions = set()
593 593 self._calls = []
594 594 self._futures = weakref.WeakSet()
595 595 self._responseexecutor = None
596 596 self._responsef = None
597 597
598 598 def __enter__(self):
599 599 return self
600 600
601 601 def __exit__(self, exctype, excvalue, exctb):
602 602 self.close()
603 603
604 604 def callcommand(self, command, args):
605 605 if self._sent:
606 606 raise error.ProgrammingError('callcommand() cannot be used after '
607 607 'commands are sent')
608 608
609 609 if self._closed:
610 610 raise error.ProgrammingError('callcommand() cannot be used after '
611 611 'close()')
612 612
613 613 # The service advertises which commands are available. So if we attempt
614 614 # to call an unknown command or pass an unknown argument, we can screen
615 615 # for this.
616 616 if command not in self._descriptor['commands']:
617 617 raise error.ProgrammingError(
618 618 'wire protocol command %s is not available' % command)
619 619
620 620 cmdinfo = self._descriptor['commands'][command]
621 621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
622 622
623 623 if unknownargs:
624 624 raise error.ProgrammingError(
625 625 'wire protocol command %s does not accept argument: %s' % (
626 626 command, ', '.join(sorted(unknownargs))))
627 627
628 628 self._neededpermissions |= set(cmdinfo['permissions'])
629 629
630 630 # TODO we /could/ also validate types here, since the API descriptor
631 631 # includes types...
632 632
633 633 f = pycompat.futures.Future()
634 634
635 635 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
636 636 # could deadlock.
637 637 f.__class__ = queuedcommandfuture
638 638 f._peerexecutor = self
639 639
640 640 self._futures.add(f)
641 641 self._calls.append((command, args, f))
642 642
643 643 return f
644 644
645 645 def sendcommands(self):
646 646 if self._sent:
647 647 return
648 648
649 649 if not self._calls:
650 650 return
651 651
652 652 self._sent = True
653 653
654 654 # Unhack any future types so caller sees a clean type and so we
655 655 # break reference cycle.
656 656 for f in self._futures:
657 657 if isinstance(f, queuedcommandfuture):
658 658 f.__class__ = pycompat.futures.Future
659 659 f._peerexecutor = None
660 660
661 661 # Mark the future as running and filter out cancelled futures.
662 662 calls = [(command, args, f)
663 663 for command, args, f in self._calls
664 664 if f.set_running_or_notify_cancel()]
665 665
666 666 # Clear out references, prevent improper object usage.
667 667 self._calls = None
668 668
669 669 if not calls:
670 670 return
671 671
672 672 permissions = set(self._neededpermissions)
673 673
674 674 if 'push' in permissions and 'pull' in permissions:
675 675 permissions.remove('pull')
676 676
677 677 if len(permissions) > 1:
678 678 raise error.RepoError(_('cannot make request requiring multiple '
679 679 'permissions: %s') %
680 680 _(', ').join(sorted(permissions)))
681 681
682 682 permission = {
683 683 'push': 'rw',
684 684 'pull': 'ro',
685 685 }[permissions.pop()]
686 686
687 687 reactor, requests, resp = sendv2request(
688 688 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 689 permission, calls)
690 690
691 691 # TODO we probably want to validate the HTTP code, media type, etc.
692 692
693 693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 694 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 695 reactor,
696 696 requests,
697 697 resp)
698 698
699 699 def close(self):
700 700 if self._closed:
701 701 return
702 702
703 703 self.sendcommands()
704 704
705 705 self._closed = True
706 706
707 707 if not self._responsef:
708 708 return
709 709
710 710 try:
711 711 self._responsef.result()
712 712 finally:
713 713 self._responseexecutor.shutdown(wait=True)
714 714 self._responsef = None
715 715 self._responseexecutor = None
716 716
717 717 # If any of our futures are still in progress, mark them as
718 718 # errored, otherwise a result() could wait indefinitely.
719 719 for f in self._futures:
720 720 if not f.done():
721 721 f.set_exception(error.ResponseError(
722 722 _('unfulfilled command response')))
723 723
724 724 self._futures = None
725 725
726 726 def _handleresponse(self, reactor, requests, resp):
727 727 # Called in a thread to read the response.
728 728
729 729 results = {k: [] for k in requests}
730 730
731 731 while True:
732 732 frame = wireprotoframing.readframe(resp)
733 733 if frame is None:
734 734 break
735 735
736 736 self._ui.note(_('received %r\n') % frame)
737 737
738 738 # Guard against receiving a frame with a request ID that we
739 739 # didn't issue. This should never happen.
740 740 request, f = requests.get(frame.requestid, [None, None])
741 741
742 742 action, meta = reactor.onframerecv(frame)
743 743
744 744 if action == 'responsedata':
745 745 assert request.requestid == meta['request'].requestid
746 746
747 747 result = results[request.requestid]
748 748
749 749 if meta['cbor']:
750 750 payload = util.bytesio(meta['data'])
751 751
752 752 decoder = cbor.CBORDecoder(payload)
753 753 while payload.tell() + 1 < len(meta['data']):
754 754 try:
755 755 result.append(decoder.decode())
756 756 except Exception:
757 757 f.set_exception_info(*sys.exc_info()[1:])
758 758 continue
759 759 else:
760 760 result.append(meta['data'])
761 761
762 762 if meta['eos']:
763 763 f.set_result(result)
764 764 del results[request.requestid]
765 765
766 elif action == 'error':
767 e = error.RepoError(meta['message'])
768
769 if f:
770 f.set_exception(e)
771 else:
772 raise e
773
766 774 else:
767 775 e = error.ProgrammingError('unhandled action: %s' % action)
768 776
769 777 if f:
770 778 f.set_exception(e)
771 779 else:
772 780 raise e
773 781
774 782 # TODO implement interface for version 2 peers
775 783 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
776 784 repository.ipeerrequests)
777 785 class httpv2peer(object):
778 786 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
779 787 apidescriptor):
780 788 self.ui = ui
781 789
782 790 if repourl.endswith('/'):
783 791 repourl = repourl[:-1]
784 792
785 793 self._url = repourl
786 794 self._apipath = apipath
787 795 self._apiurl = '%s/%s' % (repourl, apipath)
788 796 self._opener = opener
789 797 self._requestbuilder = requestbuilder
790 798 self._descriptor = apidescriptor
791 799
792 800 # Start of ipeerconnection.
793 801
794 802 def url(self):
795 803 return self._url
796 804
797 805 def local(self):
798 806 return None
799 807
800 808 def peer(self):
801 809 return self
802 810
803 811 def canpush(self):
804 812 # TODO change once implemented.
805 813 return False
806 814
807 815 def close(self):
808 816 pass
809 817
810 818 # End of ipeerconnection.
811 819
812 820 # Start of ipeercapabilities.
813 821
814 822 def capable(self, name):
815 823 # The capabilities used internally historically map to capabilities
816 824 # advertised from the "capabilities" wire protocol command. However,
817 825 # version 2 of that command works differently.
818 826
819 827 # Maps to commands that are available.
820 828 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
821 829 return True
822 830
823 831 # Other concepts.
824 832 if name in ('bundle2',):
825 833 return True
826 834
827 835 return False
828 836
829 837 def requirecap(self, name, purpose):
830 838 if self.capable(name):
831 839 return
832 840
833 841 raise error.CapabilityError(
834 842 _('cannot %s; client or remote repository does not support the %r '
835 843 'capability') % (purpose, name))
836 844
837 845 # End of ipeercapabilities.
838 846
839 847 def _call(self, name, **args):
840 848 with self.commandexecutor() as e:
841 849 return e.callcommand(name, args).result()
842 850
843 851 def commandexecutor(self):
844 852 return httpv2executor(self.ui, self._opener, self._requestbuilder,
845 853 self._apiurl, self._descriptor)
846 854
847 855 # Registry of API service names to metadata about peers that handle it.
848 856 #
849 857 # The following keys are meaningful:
850 858 #
851 859 # init
852 860 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
853 861 # apidescriptor) to create a peer.
854 862 #
855 863 # priority
856 864 # Integer priority for the service. If we could choose from multiple
857 865 # services, we choose the one with the highest priority.
858 866 API_PEERS = {
859 867 wireprototypes.HTTP_WIREPROTO_V2: {
860 868 'init': httpv2peer,
861 869 'priority': 50,
862 870 },
863 871 }
864 872
865 873 def performhandshake(ui, url, opener, requestbuilder):
866 874 # The handshake is a request to the capabilities command.
867 875
868 876 caps = None
869 877 def capable(x):
870 878 raise error.ProgrammingError('should not be called')
871 879
872 880 args = {}
873 881
874 882 # The client advertises support for newer protocols by adding an
875 883 # X-HgUpgrade-* header with a list of supported APIs and an
876 884 # X-HgProto-* header advertising which serializing formats it supports.
877 885 # We only support the HTTP version 2 transport and CBOR responses for
878 886 # now.
879 887 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
880 888
881 889 if advertisev2:
882 890 args['headers'] = {
883 891 r'X-HgProto-1': r'cbor',
884 892 }
885 893
886 894 args['headers'].update(
887 895 encodevalueinheaders(' '.join(sorted(API_PEERS)),
888 896 'X-HgUpgrade',
889 897 # We don't know the header limit this early.
890 898 # So make it small.
891 899 1024))
892 900
893 901 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
894 902 capable, url, 'capabilities',
895 903 args)
896 904
897 905 resp = sendrequest(ui, opener, req)
898 906
899 907 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
900 908 compressible=False,
901 909 allowcbor=advertisev2)
902 910
903 911 try:
904 912 rawdata = resp.read()
905 913 finally:
906 914 resp.close()
907 915
908 916 if not ct.startswith('application/mercurial-'):
909 917 raise error.ProgrammingError('unexpected content-type: %s' % ct)
910 918
911 919 if advertisev2:
912 920 if ct == 'application/mercurial-cbor':
913 921 try:
914 922 info = cbor.loads(rawdata)
915 923 except cbor.CBORDecodeError:
916 924 raise error.Abort(_('error decoding CBOR from remote server'),
917 925 hint=_('try again and consider contacting '
918 926 'the server operator'))
919 927
920 928 # We got a legacy response. That's fine.
921 929 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
922 930 info = {
923 931 'v1capabilities': set(rawdata.split())
924 932 }
925 933
926 934 else:
927 935 raise error.RepoError(
928 936 _('unexpected response type from server: %s') % ct)
929 937 else:
930 938 info = {
931 939 'v1capabilities': set(rawdata.split())
932 940 }
933 941
934 942 return respurl, info
935 943
936 944 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
937 945 """Construct an appropriate HTTP peer instance.
938 946
939 947 ``opener`` is an ``url.opener`` that should be used to establish
940 948 connections, perform HTTP requests.
941 949
942 950 ``requestbuilder`` is the type used for constructing HTTP requests.
943 951 It exists as an argument so extensions can override the default.
944 952 """
945 953 u = util.url(path)
946 954 if u.query or u.fragment:
947 955 raise error.Abort(_('unsupported URL component: "%s"') %
948 956 (u.query or u.fragment))
949 957
950 958 # urllib cannot handle URLs with embedded user or passwd.
951 959 url, authinfo = u.authinfo()
952 960 ui.debug('using %s\n' % url)
953 961
954 962 opener = opener or urlmod.opener(ui, authinfo)
955 963
956 964 respurl, info = performhandshake(ui, url, opener, requestbuilder)
957 965
958 966 # Given the intersection of APIs that both we and the server support,
959 967 # sort by their advertised priority and pick the first one.
960 968 #
961 969 # TODO consider making this request-based and interface driven. For
962 970 # example, the caller could say "I want a peer that does X." It's quite
963 971 # possible that not all peers would do that. Since we know the service
964 972 # capabilities, we could filter out services not meeting the
965 973 # requirements. Possibly by consulting the interfaces defined by the
966 974 # peer type.
967 975 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
968 976
969 977 preferredchoices = sorted(apipeerchoices,
970 978 key=lambda x: API_PEERS[x]['priority'],
971 979 reverse=True)
972 980
973 981 for service in preferredchoices:
974 982 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
975 983
976 984 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
977 985 requestbuilder,
978 986 info['apis'][service])
979 987
980 988 # Failed to construct an API peer. Fall back to legacy.
981 989 return httppeer(ui, path, respurl, opener, requestbuilder,
982 990 info['v1capabilities'])
983 991
984 992 def instance(ui, path, create):
985 993 if create:
986 994 raise error.Abort(_('cannot create new http repository'))
987 995 try:
988 996 if path.startswith('https:') and not urlmod.has_https:
989 997 raise error.Abort(_('Python support for SSL and HTTPS '
990 998 'is not installed'))
991 999
992 1000 inst = makepeer(ui, path)
993 1001
994 1002 return inst
995 1003 except error.RepoError as httpexception:
996 1004 try:
997 1005 r = statichttprepo.instance(ui, "static-" + path, create)
998 1006 ui.note(_('(falling back to static-http)\n'))
999 1007 return r
1000 1008 except error.RepoError:
1001 1009 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now