##// END OF EJS Templates
httppeer: add TODO about delayed handling of ^C...
Gregory Szorc -
r39468:3c6f7eeb default
parent child Browse files
Show More
@@ -1,1001 +1,1003 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from .thirdparty import (
20 20 cbor,
21 21 )
22 22 from . import (
23 23 bundle2,
24 24 error,
25 25 httpconnection,
26 26 pycompat,
27 27 repository,
28 28 statichttprepo,
29 29 url as urlmod,
30 30 util,
31 31 wireprotoframing,
32 32 wireprototypes,
33 33 wireprotov1peer,
34 34 wireprotov2peer,
35 35 wireprotov2server,
36 36 )
37 37 from .utils import (
38 38 interfaceutil,
39 39 stringutil,
40 40 )
41 41
42 42 httplib = util.httplib
43 43 urlerr = util.urlerr
44 44 urlreq = util.urlreq
45 45
46 46 def encodevalueinheaders(value, header, limit):
47 47 """Encode a string value into multiple HTTP headers.
48 48
49 49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 51 name + value will be at most ``limit`` bytes long.
52 52
53 53 Returns an iterable of 2-tuples consisting of header names and
54 54 values as native strings.
55 55 """
56 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 57 # not bytes. This function always takes bytes in as arguments.
58 58 fmt = pycompat.strurl(header) + r'-%s'
59 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 60 # and not a unicode: we're just getting the encoded length anyway,
61 61 # and using an r-string to make it portable between Python 2 and 3
62 62 # doesn't work because then the \r is a literal backslash-r
63 63 # instead of a carriage return.
64 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 65 result = []
66 66
67 67 n = 0
68 68 for i in pycompat.xrange(0, len(value), valuelen):
69 69 n += 1
70 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 71
72 72 return result
73 73
74 74 def _wraphttpresponse(resp):
75 75 """Wrap an HTTPResponse with common error handlers.
76 76
77 77 This ensures that any I/O from any consumer raises the appropriate
78 78 error and messaging.
79 79 """
80 80 origread = resp.read
81 81
82 82 class readerproxy(resp.__class__):
83 83 def read(self, size=None):
84 84 try:
85 85 return origread(size)
86 86 except httplib.IncompleteRead as e:
87 87 # e.expected is an integer if length known or None otherwise.
88 88 if e.expected:
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (e.expected,
91 91 len(e.partial))
92 92 else:
93 93 msg = _('HTTP request error (incomplete response)')
94 94
95 95 raise error.PeerTransportError(
96 96 msg,
97 97 hint=_('this may be an intermittent network failure; '
98 98 'if the error persists, consider contacting the '
99 99 'network or server operator'))
100 100 except httplib.HTTPException as e:
101 101 raise error.PeerTransportError(
102 102 _('HTTP request error (%s)') % e,
103 103 hint=_('this may be an intermittent network failure; '
104 104 'if the error persists, consider contacting the '
105 105 'network or server operator'))
106 106
107 107 resp.__class__ = readerproxy
108 108
109 109 class _multifile(object):
110 110 def __init__(self, *fileobjs):
111 111 for f in fileobjs:
112 112 if not util.safehasattr(f, 'length'):
113 113 raise ValueError(
114 114 '_multifile only supports file objects that '
115 115 'have a length but this one does not:', type(f), f)
116 116 self._fileobjs = fileobjs
117 117 self._index = 0
118 118
119 119 @property
120 120 def length(self):
121 121 return sum(f.length for f in self._fileobjs)
122 122
123 123 def read(self, amt=None):
124 124 if amt <= 0:
125 125 return ''.join(f.read() for f in self._fileobjs)
126 126 parts = []
127 127 while amt and self._index < len(self._fileobjs):
128 128 parts.append(self._fileobjs[self._index].read(amt))
129 129 got = len(parts[-1])
130 130 if got < amt:
131 131 self._index += 1
132 132 amt -= got
133 133 return ''.join(parts)
134 134
135 135 def seek(self, offset, whence=os.SEEK_SET):
136 136 if whence != os.SEEK_SET:
137 137 raise NotImplementedError(
138 138 '_multifile does not support anything other'
139 139 ' than os.SEEK_SET for whence on seek()')
140 140 if offset != 0:
141 141 raise NotImplementedError(
142 142 '_multifile only supports seeking to start, but that '
143 143 'could be fixed if you need it')
144 144 for f in self._fileobjs:
145 145 f.seek(0)
146 146 self._index = 0
147 147
148 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 149 repobaseurl, cmd, args):
150 150 """Make an HTTP request to run a command for a version 1 client.
151 151
152 152 ``caps`` is a set of known server capabilities. The value may be
153 153 None if capabilities are not yet known.
154 154
155 155 ``capablefn`` is a function to evaluate a capability.
156 156
157 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 158 raw data to pass to it.
159 159 """
160 160 if cmd == 'pushkey':
161 161 args['data'] = ''
162 162 data = args.pop('data', None)
163 163 headers = args.pop('headers', {})
164 164
165 165 ui.debug("sending %s command\n" % cmd)
166 166 q = [('cmd', cmd)]
167 167 headersize = 0
168 168 # Important: don't use self.capable() here or else you end up
169 169 # with infinite recursion when trying to look up capabilities
170 170 # for the first time.
171 171 postargsok = caps is not None and 'httppostargs' in caps
172 172
173 173 # Send arguments via POST.
174 174 if postargsok and args:
175 175 strargs = urlreq.urlencode(sorted(args.items()))
176 176 if not data:
177 177 data = strargs
178 178 else:
179 179 if isinstance(data, bytes):
180 180 i = io.BytesIO(data)
181 181 i.length = len(data)
182 182 data = i
183 183 argsio = io.BytesIO(strargs)
184 184 argsio.length = len(strargs)
185 185 data = _multifile(argsio, data)
186 186 headers[r'X-HgArgs-Post'] = len(strargs)
187 187 elif args:
188 188 # Calling self.capable() can infinite loop if we are calling
189 189 # "capabilities". But that command should never accept wire
190 190 # protocol arguments. So this should never happen.
191 191 assert cmd != 'capabilities'
192 192 httpheader = capablefn('httpheader')
193 193 if httpheader:
194 194 headersize = int(httpheader.split(',', 1)[0])
195 195
196 196 # Send arguments via HTTP headers.
197 197 if headersize > 0:
198 198 # The headers can typically carry more data than the URL.
199 199 encargs = urlreq.urlencode(sorted(args.items()))
200 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 201 headersize):
202 202 headers[header] = value
203 203 # Send arguments via query string (Mercurial <1.9).
204 204 else:
205 205 q += sorted(args.items())
206 206
207 207 qs = '?%s' % urlreq.urlencode(q)
208 208 cu = "%s%s" % (repobaseurl, qs)
209 209 size = 0
210 210 if util.safehasattr(data, 'length'):
211 211 size = data.length
212 212 elif data is not None:
213 213 size = len(data)
214 214 if data is not None and r'Content-Type' not in headers:
215 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216 216
217 217 # Tell the server we accept application/mercurial-0.2 and multiple
218 218 # compression formats if the server is capable of emitting those
219 219 # payloads.
220 220 # Note: Keep this set empty by default, as client advertisement of
221 221 # protocol parameters should only occur after the handshake.
222 222 protoparams = set()
223 223
224 224 mediatypes = set()
225 225 if caps is not None:
226 226 mt = capablefn('httpmediatype')
227 227 if mt:
228 228 protoparams.add('0.1')
229 229 mediatypes = set(mt.split(','))
230 230
231 231 protoparams.add('partial-pull')
232 232
233 233 if '0.2tx' in mediatypes:
234 234 protoparams.add('0.2')
235 235
236 236 if '0.2tx' in mediatypes and capablefn('compression'):
237 237 # We /could/ compare supported compression formats and prune
238 238 # non-mutually supported or error if nothing is mutually supported.
239 239 # For now, send the full list to the server and have it error.
240 240 comps = [e.wireprotosupport().name for e in
241 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 242 protoparams.add('comp=%s' % ','.join(comps))
243 243
244 244 if protoparams:
245 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 246 'X-HgProto',
247 247 headersize or 1024)
248 248 for header, value in protoheaders:
249 249 headers[header] = value
250 250
251 251 varyheaders = []
252 252 for header in headers:
253 253 if header.lower().startswith(r'x-hg'):
254 254 varyheaders.append(header)
255 255
256 256 if varyheaders:
257 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258 258
259 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260 260
261 261 if data is not None:
262 262 ui.debug("sending %d bytes\n" % size)
263 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264 264
265 265 return req, cu, qs
266 266
267 267 def _reqdata(req):
268 268 """Get request data, if any. If no data, returns None."""
269 269 if pycompat.ispy3:
270 270 return req.data
271 271 if not req.has_data():
272 272 return None
273 273 return req.get_data()
274 274
275 275 def sendrequest(ui, opener, req):
276 276 """Send a prepared HTTP request.
277 277
278 278 Returns the response object.
279 279 """
280 280 dbg = ui.debug
281 281 if (ui.debugflag
282 282 and ui.configbool('devel', 'debug.peer-request')):
283 283 line = 'devel-peer-request: %s\n'
284 284 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
285 285 pycompat.bytesurl(req.get_full_url())))
286 286 hgargssize = None
287 287
288 288 for header, value in sorted(req.header_items()):
289 289 header = pycompat.bytesurl(header)
290 290 value = pycompat.bytesurl(value)
291 291 if header.startswith('X-hgarg-'):
292 292 if hgargssize is None:
293 293 hgargssize = 0
294 294 hgargssize += len(value)
295 295 else:
296 296 dbg(line % ' %s %s' % (header, value))
297 297
298 298 if hgargssize is not None:
299 299 dbg(line % ' %d bytes of commands arguments in headers'
300 300 % hgargssize)
301 301 data = _reqdata(req)
302 302 if data is not None:
303 303 length = getattr(data, 'length', None)
304 304 if length is None:
305 305 length = len(data)
306 306 dbg(line % ' %d bytes of data' % length)
307 307
308 308 start = util.timer()
309 309
310 310 res = None
311 311 try:
312 312 res = opener.open(req)
313 313 except urlerr.httperror as inst:
314 314 if inst.code == 401:
315 315 raise error.Abort(_('authorization failed'))
316 316 raise
317 317 except httplib.HTTPException as inst:
318 318 ui.debug('http error requesting %s\n' %
319 319 util.hidepassword(req.get_full_url()))
320 320 ui.traceback()
321 321 raise IOError(None, inst)
322 322 finally:
323 323 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
324 324 code = res.code if res else -1
325 325 dbg(line % ' finished in %.4f seconds (%d)'
326 326 % (util.timer() - start, code))
327 327
328 328 # Insert error handlers for common I/O failures.
329 329 _wraphttpresponse(res)
330 330
331 331 return res
332 332
333 333 class RedirectedRepoError(error.RepoError):
334 334 def __init__(self, msg, respurl):
335 335 super(RedirectedRepoError, self).__init__(msg)
336 336 self.respurl = respurl
337 337
338 338 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
339 339 allowcbor=False):
340 340 # record the url we got redirected to
341 341 redirected = False
342 342 respurl = pycompat.bytesurl(resp.geturl())
343 343 if respurl.endswith(qs):
344 344 respurl = respurl[:-len(qs)]
345 345 qsdropped = False
346 346 else:
347 347 qsdropped = True
348 348
349 349 if baseurl.rstrip('/') != respurl.rstrip('/'):
350 350 redirected = True
351 351 if not ui.quiet:
352 352 ui.warn(_('real URL is %s\n') % respurl)
353 353
354 354 try:
355 355 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
356 356 except AttributeError:
357 357 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
358 358
359 359 safeurl = util.hidepassword(baseurl)
360 360 if proto.startswith('application/hg-error'):
361 361 raise error.OutOfBandError(resp.read())
362 362
363 363 # Pre 1.0 versions of Mercurial used text/plain and
364 364 # application/hg-changegroup. We don't support such old servers.
365 365 if not proto.startswith('application/mercurial-'):
366 366 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
367 367 msg = _("'%s' does not appear to be an hg repository:\n"
368 368 "---%%<--- (%s)\n%s\n---%%<---\n") % (
369 369 safeurl, proto or 'no content-type', resp.read(1024))
370 370
371 371 # Some servers may strip the query string from the redirect. We
372 372 # raise a special error type so callers can react to this specially.
373 373 if redirected and qsdropped:
374 374 raise RedirectedRepoError(msg, respurl)
375 375 else:
376 376 raise error.RepoError(msg)
377 377
378 378 try:
379 379 subtype = proto.split('-', 1)[1]
380 380
381 381 # Unless we end up supporting CBOR in the legacy wire protocol,
382 382 # this should ONLY be encountered for the initial capabilities
383 383 # request during handshake.
384 384 if subtype == 'cbor':
385 385 if allowcbor:
386 386 return respurl, proto, resp
387 387 else:
388 388 raise error.RepoError(_('unexpected CBOR response from '
389 389 'server'))
390 390
391 391 version_info = tuple([int(n) for n in subtype.split('.')])
392 392 except ValueError:
393 393 raise error.RepoError(_("'%s' sent a broken Content-Type "
394 394 "header (%s)") % (safeurl, proto))
395 395
396 396 # TODO consider switching to a decompression reader that uses
397 397 # generators.
398 398 if version_info == (0, 1):
399 399 if compressible:
400 400 resp = util.compengines['zlib'].decompressorreader(resp)
401 401
402 402 elif version_info == (0, 2):
403 403 # application/mercurial-0.2 always identifies the compression
404 404 # engine in the payload header.
405 405 elen = struct.unpack('B', resp.read(1))[0]
406 406 ename = resp.read(elen)
407 407 engine = util.compengines.forwiretype(ename)
408 408
409 409 resp = engine.decompressorreader(resp)
410 410 else:
411 411 raise error.RepoError(_("'%s' uses newer protocol %s") %
412 412 (safeurl, subtype))
413 413
414 414 return respurl, proto, resp
415 415
416 416 class httppeer(wireprotov1peer.wirepeer):
417 417 def __init__(self, ui, path, url, opener, requestbuilder, caps):
418 418 self.ui = ui
419 419 self._path = path
420 420 self._url = url
421 421 self._caps = caps
422 422 self._urlopener = opener
423 423 self._requestbuilder = requestbuilder
424 424
425 425 def __del__(self):
426 426 for h in self._urlopener.handlers:
427 427 h.close()
428 428 getattr(h, "close_all", lambda: None)()
429 429
430 430 # Begin of ipeerconnection interface.
431 431
432 432 def url(self):
433 433 return self._path
434 434
435 435 def local(self):
436 436 return None
437 437
438 438 def peer(self):
439 439 return self
440 440
441 441 def canpush(self):
442 442 return True
443 443
444 444 def close(self):
445 445 pass
446 446
447 447 # End of ipeerconnection interface.
448 448
449 449 # Begin of ipeercommands interface.
450 450
451 451 def capabilities(self):
452 452 return self._caps
453 453
454 454 # End of ipeercommands interface.
455 455
456 456 def _callstream(self, cmd, _compressible=False, **args):
457 457 args = pycompat.byteskwargs(args)
458 458
459 459 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
460 460 self._caps, self.capable,
461 461 self._url, cmd, args)
462 462
463 463 resp = sendrequest(self.ui, self._urlopener, req)
464 464
465 465 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
466 466 resp, _compressible)
467 467
468 468 return resp
469 469
470 470 def _call(self, cmd, **args):
471 471 fp = self._callstream(cmd, **args)
472 472 try:
473 473 return fp.read()
474 474 finally:
475 475 # if using keepalive, allow connection to be reused
476 476 fp.close()
477 477
478 478 def _callpush(self, cmd, cg, **args):
479 479 # have to stream bundle to a temp file because we do not have
480 480 # http 1.1 chunked transfer.
481 481
482 482 types = self.capable('unbundle')
483 483 try:
484 484 types = types.split(',')
485 485 except AttributeError:
486 486 # servers older than d1b16a746db6 will send 'unbundle' as a
487 487 # boolean capability. They only support headerless/uncompressed
488 488 # bundles.
489 489 types = [""]
490 490 for x in types:
491 491 if x in bundle2.bundletypes:
492 492 type = x
493 493 break
494 494
495 495 tempname = bundle2.writebundle(self.ui, cg, None, type)
496 496 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
497 497 headers = {r'Content-Type': r'application/mercurial-0.1'}
498 498
499 499 try:
500 500 r = self._call(cmd, data=fp, headers=headers, **args)
501 501 vals = r.split('\n', 1)
502 502 if len(vals) < 2:
503 503 raise error.ResponseError(_("unexpected response:"), r)
504 504 return vals
505 505 except urlerr.httperror:
506 506 # Catch and re-raise these so we don't try and treat them
507 507 # like generic socket errors. They lack any values in
508 508 # .args on Python 3 which breaks our socket.error block.
509 509 raise
510 510 except socket.error as err:
511 511 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
512 512 raise error.Abort(_('push failed: %s') % err.args[1])
513 513 raise error.Abort(err.args[1])
514 514 finally:
515 515 fp.close()
516 516 os.unlink(tempname)
517 517
518 518 def _calltwowaystream(self, cmd, fp, **args):
519 519 fh = None
520 520 fp_ = None
521 521 filename = None
522 522 try:
523 523 # dump bundle to disk
524 524 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
525 525 fh = os.fdopen(fd, r"wb")
526 526 d = fp.read(4096)
527 527 while d:
528 528 fh.write(d)
529 529 d = fp.read(4096)
530 530 fh.close()
531 531 # start http push
532 532 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
533 533 headers = {r'Content-Type': r'application/mercurial-0.1'}
534 534 return self._callstream(cmd, data=fp_, headers=headers, **args)
535 535 finally:
536 536 if fp_ is not None:
537 537 fp_.close()
538 538 if fh is not None:
539 539 fh.close()
540 540 os.unlink(filename)
541 541
542 542 def _callcompressable(self, cmd, **args):
543 543 return self._callstream(cmd, _compressible=True, **args)
544 544
545 545 def _abort(self, exception):
546 546 raise exception
547 547
548 548 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
549 549 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
550 550 buffersends=True)
551 551
552 552 handler = wireprotov2peer.clienthandler(ui, reactor)
553 553
554 554 url = '%s/%s' % (apiurl, permission)
555 555
556 556 if len(requests) > 1:
557 557 url += '/multirequest'
558 558 else:
559 559 url += '/%s' % requests[0][0]
560 560
561 561 ui.debug('sending %d commands\n' % len(requests))
562 562 for command, args, f in requests:
563 563 ui.debug('sending command %s: %s\n' % (
564 564 command, stringutil.pprint(args, indent=2)))
565 565 assert not list(handler.callcommand(command, args, f))
566 566
567 567 # TODO stream this.
568 568 body = b''.join(map(bytes, handler.flushcommands()))
569 569
570 570 # TODO modify user-agent to reflect v2
571 571 headers = {
572 572 r'Accept': wireprotov2server.FRAMINGTYPE,
573 573 r'Content-Type': wireprotov2server.FRAMINGTYPE,
574 574 }
575 575
576 576 req = requestbuilder(pycompat.strurl(url), body, headers)
577 577 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
578 578
579 579 try:
580 580 res = opener.open(req)
581 581 except urlerr.httperror as e:
582 582 if e.code == 401:
583 583 raise error.Abort(_('authorization failed'))
584 584
585 585 raise
586 586 except httplib.HTTPException as e:
587 587 ui.traceback()
588 588 raise IOError(None, e)
589 589
590 590 return handler, res
591 591
592 592 class queuedcommandfuture(pycompat.futures.Future):
593 593 """Wraps result() on command futures to trigger submission on call."""
594 594
595 595 def result(self, timeout=None):
596 596 if self.done():
597 597 return pycompat.futures.Future.result(self, timeout)
598 598
599 599 self._peerexecutor.sendcommands()
600 600
601 601 # sendcommands() will restore the original __class__ and self.result
602 602 # will resolve to Future.result.
603 603 return self.result(timeout)
604 604
605 605 @interfaceutil.implementer(repository.ipeercommandexecutor)
606 606 class httpv2executor(object):
607 607 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
608 608 self._ui = ui
609 609 self._opener = opener
610 610 self._requestbuilder = requestbuilder
611 611 self._apiurl = apiurl
612 612 self._descriptor = descriptor
613 613 self._sent = False
614 614 self._closed = False
615 615 self._neededpermissions = set()
616 616 self._calls = []
617 617 self._futures = weakref.WeakSet()
618 618 self._responseexecutor = None
619 619 self._responsef = None
620 620
621 621 def __enter__(self):
622 622 return self
623 623
624 624 def __exit__(self, exctype, excvalue, exctb):
625 625 self.close()
626 626
627 627 def callcommand(self, command, args):
628 628 if self._sent:
629 629 raise error.ProgrammingError('callcommand() cannot be used after '
630 630 'commands are sent')
631 631
632 632 if self._closed:
633 633 raise error.ProgrammingError('callcommand() cannot be used after '
634 634 'close()')
635 635
636 636 # The service advertises which commands are available. So if we attempt
637 637 # to call an unknown command or pass an unknown argument, we can screen
638 638 # for this.
639 639 if command not in self._descriptor['commands']:
640 640 raise error.ProgrammingError(
641 641 'wire protocol command %s is not available' % command)
642 642
643 643 cmdinfo = self._descriptor['commands'][command]
644 644 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
645 645
646 646 if unknownargs:
647 647 raise error.ProgrammingError(
648 648 'wire protocol command %s does not accept argument: %s' % (
649 649 command, ', '.join(sorted(unknownargs))))
650 650
651 651 self._neededpermissions |= set(cmdinfo['permissions'])
652 652
653 653 # TODO we /could/ also validate types here, since the API descriptor
654 654 # includes types...
655 655
656 656 f = pycompat.futures.Future()
657 657
658 658 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
659 659 # could deadlock.
660 660 f.__class__ = queuedcommandfuture
661 661 f._peerexecutor = self
662 662
663 663 self._futures.add(f)
664 664 self._calls.append((command, args, f))
665 665
666 666 return f
667 667
668 668 def sendcommands(self):
669 669 if self._sent:
670 670 return
671 671
672 672 if not self._calls:
673 673 return
674 674
675 675 self._sent = True
676 676
677 677 # Unhack any future types so caller sees a clean type and so we
678 678 # break reference cycle.
679 679 for f in self._futures:
680 680 if isinstance(f, queuedcommandfuture):
681 681 f.__class__ = pycompat.futures.Future
682 682 f._peerexecutor = None
683 683
684 684 # Mark the future as running and filter out cancelled futures.
685 685 calls = [(command, args, f)
686 686 for command, args, f in self._calls
687 687 if f.set_running_or_notify_cancel()]
688 688
689 689 # Clear out references, prevent improper object usage.
690 690 self._calls = None
691 691
692 692 if not calls:
693 693 return
694 694
695 695 permissions = set(self._neededpermissions)
696 696
697 697 if 'push' in permissions and 'pull' in permissions:
698 698 permissions.remove('pull')
699 699
700 700 if len(permissions) > 1:
701 701 raise error.RepoError(_('cannot make request requiring multiple '
702 702 'permissions: %s') %
703 703 _(', ').join(sorted(permissions)))
704 704
705 705 permission = {
706 706 'push': 'rw',
707 707 'pull': 'ro',
708 708 }[permissions.pop()]
709 709
710 710 handler, resp = sendv2request(
711 711 self._ui, self._opener, self._requestbuilder, self._apiurl,
712 712 permission, calls)
713 713
714 714 # TODO we probably want to validate the HTTP code, media type, etc.
715 715
716 716 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
717 717 self._responsef = self._responseexecutor.submit(self._handleresponse,
718 718 handler, resp)
719 719
720 720 def close(self):
721 721 if self._closed:
722 722 return
723 723
724 724 self.sendcommands()
725 725
726 726 self._closed = True
727 727
728 728 if not self._responsef:
729 729 return
730 730
731 # TODO ^C here may not result in immediate program termination.
732
731 733 try:
732 734 self._responsef.result()
733 735 finally:
734 736 self._responseexecutor.shutdown(wait=True)
735 737 self._responsef = None
736 738 self._responseexecutor = None
737 739
738 740 # If any of our futures are still in progress, mark them as
739 741 # errored, otherwise a result() could wait indefinitely.
740 742 for f in self._futures:
741 743 if not f.done():
742 744 f.set_exception(error.ResponseError(
743 745 _('unfulfilled command response')))
744 746
745 747 self._futures = None
746 748
747 749 def _handleresponse(self, handler, resp):
748 750 # Called in a thread to read the response.
749 751
750 752 while handler.readframe(resp):
751 753 pass
752 754
753 755 # TODO implement interface for version 2 peers
754 756 @interfaceutil.implementer(repository.ipeerconnection,
755 757 repository.ipeercapabilities,
756 758 repository.ipeerrequests)
757 759 class httpv2peer(object):
758 760 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
759 761 apidescriptor):
760 762 self.ui = ui
761 763
762 764 if repourl.endswith('/'):
763 765 repourl = repourl[:-1]
764 766
765 767 self._url = repourl
766 768 self._apipath = apipath
767 769 self._apiurl = '%s/%s' % (repourl, apipath)
768 770 self._opener = opener
769 771 self._requestbuilder = requestbuilder
770 772 self._descriptor = apidescriptor
771 773
772 774 # Start of ipeerconnection.
773 775
774 776 def url(self):
775 777 return self._url
776 778
777 779 def local(self):
778 780 return None
779 781
780 782 def peer(self):
781 783 return self
782 784
783 785 def canpush(self):
784 786 # TODO change once implemented.
785 787 return False
786 788
787 789 def close(self):
788 790 pass
789 791
790 792 # End of ipeerconnection.
791 793
792 794 # Start of ipeercapabilities.
793 795
794 796 def capable(self, name):
795 797 # The capabilities used internally historically map to capabilities
796 798 # advertised from the "capabilities" wire protocol command. However,
797 799 # version 2 of that command works differently.
798 800
799 801 # Maps to commands that are available.
800 802 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
801 803 return True
802 804
803 805 # Other concepts.
804 806 if name in ('bundle2',):
805 807 return True
806 808
807 809 return False
808 810
809 811 def requirecap(self, name, purpose):
810 812 if self.capable(name):
811 813 return
812 814
813 815 raise error.CapabilityError(
814 816 _('cannot %s; client or remote repository does not support the %r '
815 817 'capability') % (purpose, name))
816 818
817 819 # End of ipeercapabilities.
818 820
819 821 def _call(self, name, **args):
820 822 with self.commandexecutor() as e:
821 823 return e.callcommand(name, args).result()
822 824
823 825 def commandexecutor(self):
824 826 return httpv2executor(self.ui, self._opener, self._requestbuilder,
825 827 self._apiurl, self._descriptor)
826 828
827 829 # Registry of API service names to metadata about peers that handle it.
828 830 #
829 831 # The following keys are meaningful:
830 832 #
831 833 # init
832 834 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
833 835 # apidescriptor) to create a peer.
834 836 #
835 837 # priority
836 838 # Integer priority for the service. If we could choose from multiple
837 839 # services, we choose the one with the highest priority.
838 840 API_PEERS = {
839 841 wireprototypes.HTTP_WIREPROTO_V2: {
840 842 'init': httpv2peer,
841 843 'priority': 50,
842 844 },
843 845 }
844 846
845 847 def performhandshake(ui, url, opener, requestbuilder):
846 848 # The handshake is a request to the capabilities command.
847 849
848 850 caps = None
849 851 def capable(x):
850 852 raise error.ProgrammingError('should not be called')
851 853
852 854 args = {}
853 855
854 856 # The client advertises support for newer protocols by adding an
855 857 # X-HgUpgrade-* header with a list of supported APIs and an
856 858 # X-HgProto-* header advertising which serializing formats it supports.
857 859 # We only support the HTTP version 2 transport and CBOR responses for
858 860 # now.
859 861 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
860 862
861 863 if advertisev2:
862 864 args['headers'] = {
863 865 r'X-HgProto-1': r'cbor',
864 866 }
865 867
866 868 args['headers'].update(
867 869 encodevalueinheaders(' '.join(sorted(API_PEERS)),
868 870 'X-HgUpgrade',
869 871 # We don't know the header limit this early.
870 872 # So make it small.
871 873 1024))
872 874
873 875 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
874 876 capable, url, 'capabilities',
875 877 args)
876 878 resp = sendrequest(ui, opener, req)
877 879
878 880 # The server may redirect us to the repo root, stripping the
879 881 # ?cmd=capabilities query string from the URL. The server would likely
880 882 # return HTML in this case and ``parsev1commandresponse()`` would raise.
881 883 # We catch this special case and re-issue the capabilities request against
882 884 # the new URL.
883 885 #
884 886 # We should ideally not do this, as a redirect that drops the query
885 887 # string from the URL is arguably a server bug. (Garbage in, garbage out).
886 888 # However, Mercurial clients for several years appeared to handle this
887 889 # issue without behavior degradation. And according to issue 5860, it may
888 890 # be a longstanding bug in some server implementations. So we allow a
889 891 # redirect that drops the query string to "just work."
890 892 try:
891 893 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
892 894 compressible=False,
893 895 allowcbor=advertisev2)
894 896 except RedirectedRepoError as e:
895 897 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
896 898 capable, e.respurl,
897 899 'capabilities', args)
898 900 resp = sendrequest(ui, opener, req)
899 901 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
900 902 compressible=False,
901 903 allowcbor=advertisev2)
902 904
903 905 try:
904 906 rawdata = resp.read()
905 907 finally:
906 908 resp.close()
907 909
908 910 if not ct.startswith('application/mercurial-'):
909 911 raise error.ProgrammingError('unexpected content-type: %s' % ct)
910 912
911 913 if advertisev2:
912 914 if ct == 'application/mercurial-cbor':
913 915 try:
914 916 info = cbor.loads(rawdata)
915 917 except cbor.CBORDecodeError:
916 918 raise error.Abort(_('error decoding CBOR from remote server'),
917 919 hint=_('try again and consider contacting '
918 920 'the server operator'))
919 921
920 922 # We got a legacy response. That's fine.
921 923 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
922 924 info = {
923 925 'v1capabilities': set(rawdata.split())
924 926 }
925 927
926 928 else:
927 929 raise error.RepoError(
928 930 _('unexpected response type from server: %s') % ct)
929 931 else:
930 932 info = {
931 933 'v1capabilities': set(rawdata.split())
932 934 }
933 935
934 936 return respurl, info
935 937
936 938 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
937 939 """Construct an appropriate HTTP peer instance.
938 940
939 941 ``opener`` is an ``url.opener`` that should be used to establish
940 942 connections, perform HTTP requests.
941 943
942 944 ``requestbuilder`` is the type used for constructing HTTP requests.
943 945 It exists as an argument so extensions can override the default.
944 946 """
945 947 u = util.url(path)
946 948 if u.query or u.fragment:
947 949 raise error.Abort(_('unsupported URL component: "%s"') %
948 950 (u.query or u.fragment))
949 951
950 952 # urllib cannot handle URLs with embedded user or passwd.
951 953 url, authinfo = u.authinfo()
952 954 ui.debug('using %s\n' % url)
953 955
954 956 opener = opener or urlmod.opener(ui, authinfo)
955 957
956 958 respurl, info = performhandshake(ui, url, opener, requestbuilder)
957 959
958 960 # Given the intersection of APIs that both we and the server support,
959 961 # sort by their advertised priority and pick the first one.
960 962 #
961 963 # TODO consider making this request-based and interface driven. For
962 964 # example, the caller could say "I want a peer that does X." It's quite
963 965 # possible that not all peers would do that. Since we know the service
964 966 # capabilities, we could filter out services not meeting the
965 967 # requirements. Possibly by consulting the interfaces defined by the
966 968 # peer type.
967 969 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
968 970
969 971 preferredchoices = sorted(apipeerchoices,
970 972 key=lambda x: API_PEERS[x]['priority'],
971 973 reverse=True)
972 974
973 975 for service in preferredchoices:
974 976 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
975 977
976 978 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
977 979 requestbuilder,
978 980 info['apis'][service])
979 981
980 982 # Failed to construct an API peer. Fall back to legacy.
981 983 return httppeer(ui, path, respurl, opener, requestbuilder,
982 984 info['v1capabilities'])
983 985
984 986 def instance(ui, path, create, intents=None):
985 987 if create:
986 988 raise error.Abort(_('cannot create new http repository'))
987 989 try:
988 990 if path.startswith('https:') and not urlmod.has_https:
989 991 raise error.Abort(_('Python support for SSL and HTTPS '
990 992 'is not installed'))
991 993
992 994 inst = makepeer(ui, path)
993 995
994 996 return inst
995 997 except error.RepoError as httpexception:
996 998 try:
997 999 r = statichttprepo.instance(ui, "static-" + path, create)
998 1000 ui.note(_('(falling back to static-http)\n'))
999 1001 return r
1000 1002 except error.RepoError:
1001 1003 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now