##// END OF EJS Templates
httppeer: properly gate debug usage behind debug flag check...
Boris Feld -
r38138:f9dc1d5b stable
parent child Browse files
Show More
@@ -1,996 +1,996
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17 import weakref
18 18
19 19 from .i18n import _
20 20 from .thirdparty import (
21 21 cbor,
22 22 )
23 23 from . import (
24 24 bundle2,
25 25 error,
26 26 httpconnection,
27 27 pycompat,
28 28 repository,
29 29 statichttprepo,
30 30 url as urlmod,
31 31 util,
32 32 wireprotoframing,
33 33 wireprototypes,
34 34 wireprotov1peer,
35 35 wireprotov2peer,
36 36 wireprotov2server,
37 37 )
38 38 from .utils import (
39 39 interfaceutil,
40 40 )
41 41
42 42 httplib = util.httplib
43 43 urlerr = util.urlerr
44 44 urlreq = util.urlreq
45 45
46 46 def encodevalueinheaders(value, header, limit):
47 47 """Encode a string value into multiple HTTP headers.
48 48
49 49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 51 name + value will be at most ``limit`` bytes long.
52 52
53 53 Returns an iterable of 2-tuples consisting of header names and
54 54 values as native strings.
55 55 """
56 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 57 # not bytes. This function always takes bytes in as arguments.
58 58 fmt = pycompat.strurl(header) + r'-%s'
59 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 60 # and not a unicode: we're just getting the encoded length anyway,
61 61 # and using an r-string to make it portable between Python 2 and 3
62 62 # doesn't work because then the \r is a literal backslash-r
63 63 # instead of a carriage return.
64 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 65 result = []
66 66
67 67 n = 0
68 68 for i in xrange(0, len(value), valuelen):
69 69 n += 1
70 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 71
72 72 return result
73 73
74 74 def _wraphttpresponse(resp):
75 75 """Wrap an HTTPResponse with common error handlers.
76 76
77 77 This ensures that any I/O from any consumer raises the appropriate
78 78 error and messaging.
79 79 """
80 80 origread = resp.read
81 81
82 82 class readerproxy(resp.__class__):
83 83 def read(self, size=None):
84 84 try:
85 85 return origread(size)
86 86 except httplib.IncompleteRead as e:
87 87 # e.expected is an integer if length known or None otherwise.
88 88 if e.expected:
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (e.expected,
91 91 len(e.partial))
92 92 else:
93 93 msg = _('HTTP request error (incomplete response)')
94 94
95 95 raise error.PeerTransportError(
96 96 msg,
97 97 hint=_('this may be an intermittent network failure; '
98 98 'if the error persists, consider contacting the '
99 99 'network or server operator'))
100 100 except httplib.HTTPException as e:
101 101 raise error.PeerTransportError(
102 102 _('HTTP request error (%s)') % e,
103 103 hint=_('this may be an intermittent network failure; '
104 104 'if the error persists, consider contacting the '
105 105 'network or server operator'))
106 106
107 107 resp.__class__ = readerproxy
108 108
109 109 class _multifile(object):
110 110 def __init__(self, *fileobjs):
111 111 for f in fileobjs:
112 112 if not util.safehasattr(f, 'length'):
113 113 raise ValueError(
114 114 '_multifile only supports file objects that '
115 115 'have a length but this one does not:', type(f), f)
116 116 self._fileobjs = fileobjs
117 117 self._index = 0
118 118
119 119 @property
120 120 def length(self):
121 121 return sum(f.length for f in self._fileobjs)
122 122
123 123 def read(self, amt=None):
124 124 if amt <= 0:
125 125 return ''.join(f.read() for f in self._fileobjs)
126 126 parts = []
127 127 while amt and self._index < len(self._fileobjs):
128 128 parts.append(self._fileobjs[self._index].read(amt))
129 129 got = len(parts[-1])
130 130 if got < amt:
131 131 self._index += 1
132 132 amt -= got
133 133 return ''.join(parts)
134 134
135 135 def seek(self, offset, whence=os.SEEK_SET):
136 136 if whence != os.SEEK_SET:
137 137 raise NotImplementedError(
138 138 '_multifile does not support anything other'
139 139 ' than os.SEEK_SET for whence on seek()')
140 140 if offset != 0:
141 141 raise NotImplementedError(
142 142 '_multifile only supports seeking to start, but that '
143 143 'could be fixed if you need it')
144 144 for f in self._fileobjs:
145 145 f.seek(0)
146 146 self._index = 0
147 147
148 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 149 repobaseurl, cmd, args):
150 150 """Make an HTTP request to run a command for a version 1 client.
151 151
152 152 ``caps`` is a set of known server capabilities. The value may be
153 153 None if capabilities are not yet known.
154 154
155 155 ``capablefn`` is a function to evaluate a capability.
156 156
157 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 158 raw data to pass to it.
159 159 """
160 160 if cmd == 'pushkey':
161 161 args['data'] = ''
162 162 data = args.pop('data', None)
163 163 headers = args.pop('headers', {})
164 164
165 165 ui.debug("sending %s command\n" % cmd)
166 166 q = [('cmd', cmd)]
167 167 headersize = 0
168 168 # Important: don't use self.capable() here or else you end up
169 169 # with infinite recursion when trying to look up capabilities
170 170 # for the first time.
171 171 postargsok = caps is not None and 'httppostargs' in caps
172 172
173 173 # Send arguments via POST.
174 174 if postargsok and args:
175 175 strargs = urlreq.urlencode(sorted(args.items()))
176 176 if not data:
177 177 data = strargs
178 178 else:
179 179 if isinstance(data, bytes):
180 180 i = io.BytesIO(data)
181 181 i.length = len(data)
182 182 data = i
183 183 argsio = io.BytesIO(strargs)
184 184 argsio.length = len(strargs)
185 185 data = _multifile(argsio, data)
186 186 headers[r'X-HgArgs-Post'] = len(strargs)
187 187 elif args:
188 188 # Calling self.capable() can infinite loop if we are calling
189 189 # "capabilities". But that command should never accept wire
190 190 # protocol arguments. So this should never happen.
191 191 assert cmd != 'capabilities'
192 192 httpheader = capablefn('httpheader')
193 193 if httpheader:
194 194 headersize = int(httpheader.split(',', 1)[0])
195 195
196 196 # Send arguments via HTTP headers.
197 197 if headersize > 0:
198 198 # The headers can typically carry more data than the URL.
199 199 encargs = urlreq.urlencode(sorted(args.items()))
200 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 201 headersize):
202 202 headers[header] = value
203 203 # Send arguments via query string (Mercurial <1.9).
204 204 else:
205 205 q += sorted(args.items())
206 206
207 207 qs = '?%s' % urlreq.urlencode(q)
208 208 cu = "%s%s" % (repobaseurl, qs)
209 209 size = 0
210 210 if util.safehasattr(data, 'length'):
211 211 size = data.length
212 212 elif data is not None:
213 213 size = len(data)
214 214 if data is not None and r'Content-Type' not in headers:
215 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216 216
217 217 # Tell the server we accept application/mercurial-0.2 and multiple
218 218 # compression formats if the server is capable of emitting those
219 219 # payloads.
220 220 # Note: Keep this set empty by default, as client advertisement of
221 221 # protocol parameters should only occur after the handshake.
222 222 protoparams = set()
223 223
224 224 mediatypes = set()
225 225 if caps is not None:
226 226 mt = capablefn('httpmediatype')
227 227 if mt:
228 228 protoparams.add('0.1')
229 229 mediatypes = set(mt.split(','))
230 230
231 231 protoparams.add('partial-pull')
232 232
233 233 if '0.2tx' in mediatypes:
234 234 protoparams.add('0.2')
235 235
236 236 if '0.2tx' in mediatypes and capablefn('compression'):
237 237 # We /could/ compare supported compression formats and prune
238 238 # non-mutually supported or error if nothing is mutually supported.
239 239 # For now, send the full list to the server and have it error.
240 240 comps = [e.wireprotosupport().name for e in
241 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 242 protoparams.add('comp=%s' % ','.join(comps))
243 243
244 244 if protoparams:
245 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 246 'X-HgProto',
247 247 headersize or 1024)
248 248 for header, value in protoheaders:
249 249 headers[header] = value
250 250
251 251 varyheaders = []
252 252 for header in headers:
253 253 if header.lower().startswith(r'x-hg'):
254 254 varyheaders.append(header)
255 255
256 256 if varyheaders:
257 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258 258
259 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260 260
261 261 if data is not None:
262 262 ui.debug("sending %d bytes\n" % size)
263 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264 264
265 265 return req, cu, qs
266 266
267 267 def _reqdata(req):
268 268 """Get request data, if any. If no data, returns None."""
269 269 if pycompat.ispy3:
270 270 return req.data
271 271 if not req.has_data():
272 272 return None
273 273 return req.get_data()
274 274
275 275 def sendrequest(ui, opener, req):
276 276 """Send a prepared HTTP request.
277 277
278 278 Returns the response object.
279 279 """
280 280 if (ui.debugflag
281 281 and ui.configbool('devel', 'debug.peer-request')):
282 282 dbg = ui.debug
283 283 line = 'devel-peer-request: %s\n'
284 284 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
285 285 pycompat.bytesurl(req.get_full_url())))
286 286 hgargssize = None
287 287
288 288 for header, value in sorted(req.header_items()):
289 289 header = pycompat.bytesurl(header)
290 290 value = pycompat.bytesurl(value)
291 291 if header.startswith('X-hgarg-'):
292 292 if hgargssize is None:
293 293 hgargssize = 0
294 294 hgargssize += len(value)
295 295 else:
296 296 dbg(line % ' %s %s' % (header, value))
297 297
298 298 if hgargssize is not None:
299 299 dbg(line % ' %d bytes of commands arguments in headers'
300 300 % hgargssize)
301 301 data = _reqdata(req)
302 302 if data is not None:
303 303 length = getattr(data, 'length', None)
304 304 if length is None:
305 305 length = len(data)
306 306 dbg(line % ' %d bytes of data' % length)
307 307
308 308 start = util.timer()
309 309
310 310 try:
311 311 res = opener.open(req)
312 312 except urlerr.httperror as inst:
313 313 if inst.code == 401:
314 314 raise error.Abort(_('authorization failed'))
315 315 raise
316 316 except httplib.HTTPException as inst:
317 317 ui.debug('http error requesting %s\n' %
318 318 util.hidepassword(req.get_full_url()))
319 319 ui.traceback()
320 320 raise IOError(None, inst)
321 321 finally:
322 if ui.configbool('devel', 'debug.peer-request'):
322 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
323 323 dbg(line % ' finished in %.4f seconds (%d)'
324 324 % (util.timer() - start, res.code))
325 325
326 326 # Insert error handlers for common I/O failures.
327 327 _wraphttpresponse(res)
328 328
329 329 return res
330 330
331 331 class RedirectedRepoError(error.RepoError):
332 332 def __init__(self, msg, respurl):
333 333 super(RedirectedRepoError, self).__init__(msg)
334 334 self.respurl = respurl
335 335
336 336 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
337 337 allowcbor=False):
338 338 # record the url we got redirected to
339 339 redirected = False
340 340 respurl = pycompat.bytesurl(resp.geturl())
341 341 if respurl.endswith(qs):
342 342 respurl = respurl[:-len(qs)]
343 343 qsdropped = False
344 344 else:
345 345 qsdropped = True
346 346
347 347 if baseurl.rstrip('/') != respurl.rstrip('/'):
348 348 redirected = True
349 349 if not ui.quiet:
350 350 ui.warn(_('real URL is %s\n') % respurl)
351 351
352 352 try:
353 353 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
354 354 except AttributeError:
355 355 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
356 356
357 357 safeurl = util.hidepassword(baseurl)
358 358 if proto.startswith('application/hg-error'):
359 359 raise error.OutOfBandError(resp.read())
360 360
361 361 # Pre 1.0 versions of Mercurial used text/plain and
362 362 # application/hg-changegroup. We don't support such old servers.
363 363 if not proto.startswith('application/mercurial-'):
364 364 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
365 365 msg = _("'%s' does not appear to be an hg repository:\n"
366 366 "---%%<--- (%s)\n%s\n---%%<---\n") % (
367 367 safeurl, proto or 'no content-type', resp.read(1024))
368 368
369 369 # Some servers may strip the query string from the redirect. We
370 370 # raise a special error type so callers can react to this specially.
371 371 if redirected and qsdropped:
372 372 raise RedirectedRepoError(msg, respurl)
373 373 else:
374 374 raise error.RepoError(msg)
375 375
376 376 try:
377 377 subtype = proto.split('-', 1)[1]
378 378
379 379 # Unless we end up supporting CBOR in the legacy wire protocol,
380 380 # this should ONLY be encountered for the initial capabilities
381 381 # request during handshake.
382 382 if subtype == 'cbor':
383 383 if allowcbor:
384 384 return respurl, proto, resp
385 385 else:
386 386 raise error.RepoError(_('unexpected CBOR response from '
387 387 'server'))
388 388
389 389 version_info = tuple([int(n) for n in subtype.split('.')])
390 390 except ValueError:
391 391 raise error.RepoError(_("'%s' sent a broken Content-Type "
392 392 "header (%s)") % (safeurl, proto))
393 393
394 394 # TODO consider switching to a decompression reader that uses
395 395 # generators.
396 396 if version_info == (0, 1):
397 397 if compressible:
398 398 resp = util.compengines['zlib'].decompressorreader(resp)
399 399
400 400 elif version_info == (0, 2):
401 401 # application/mercurial-0.2 always identifies the compression
402 402 # engine in the payload header.
403 403 elen = struct.unpack('B', resp.read(1))[0]
404 404 ename = resp.read(elen)
405 405 engine = util.compengines.forwiretype(ename)
406 406
407 407 resp = engine.decompressorreader(resp)
408 408 else:
409 409 raise error.RepoError(_("'%s' uses newer protocol %s") %
410 410 (safeurl, subtype))
411 411
412 412 return respurl, proto, resp
413 413
414 414 class httppeer(wireprotov1peer.wirepeer):
415 415 def __init__(self, ui, path, url, opener, requestbuilder, caps):
416 416 self.ui = ui
417 417 self._path = path
418 418 self._url = url
419 419 self._caps = caps
420 420 self._urlopener = opener
421 421 self._requestbuilder = requestbuilder
422 422
423 423 def __del__(self):
424 424 for h in self._urlopener.handlers:
425 425 h.close()
426 426 getattr(h, "close_all", lambda: None)()
427 427
428 428 # Begin of ipeerconnection interface.
429 429
430 430 def url(self):
431 431 return self._path
432 432
433 433 def local(self):
434 434 return None
435 435
436 436 def peer(self):
437 437 return self
438 438
439 439 def canpush(self):
440 440 return True
441 441
442 442 def close(self):
443 443 pass
444 444
445 445 # End of ipeerconnection interface.
446 446
447 447 # Begin of ipeercommands interface.
448 448
449 449 def capabilities(self):
450 450 return self._caps
451 451
452 452 # End of ipeercommands interface.
453 453
454 454 def _callstream(self, cmd, _compressible=False, **args):
455 455 args = pycompat.byteskwargs(args)
456 456
457 457 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
458 458 self._caps, self.capable,
459 459 self._url, cmd, args)
460 460
461 461 resp = sendrequest(self.ui, self._urlopener, req)
462 462
463 463 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
464 464 resp, _compressible)
465 465
466 466 return resp
467 467
468 468 def _call(self, cmd, **args):
469 469 fp = self._callstream(cmd, **args)
470 470 try:
471 471 return fp.read()
472 472 finally:
473 473 # if using keepalive, allow connection to be reused
474 474 fp.close()
475 475
476 476 def _callpush(self, cmd, cg, **args):
477 477 # have to stream bundle to a temp file because we do not have
478 478 # http 1.1 chunked transfer.
479 479
480 480 types = self.capable('unbundle')
481 481 try:
482 482 types = types.split(',')
483 483 except AttributeError:
484 484 # servers older than d1b16a746db6 will send 'unbundle' as a
485 485 # boolean capability. They only support headerless/uncompressed
486 486 # bundles.
487 487 types = [""]
488 488 for x in types:
489 489 if x in bundle2.bundletypes:
490 490 type = x
491 491 break
492 492
493 493 tempname = bundle2.writebundle(self.ui, cg, None, type)
494 494 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
495 495 headers = {r'Content-Type': r'application/mercurial-0.1'}
496 496
497 497 try:
498 498 r = self._call(cmd, data=fp, headers=headers, **args)
499 499 vals = r.split('\n', 1)
500 500 if len(vals) < 2:
501 501 raise error.ResponseError(_("unexpected response:"), r)
502 502 return vals
503 503 except urlerr.httperror:
504 504 # Catch and re-raise these so we don't try and treat them
505 505 # like generic socket errors. They lack any values in
506 506 # .args on Python 3 which breaks our socket.error block.
507 507 raise
508 508 except socket.error as err:
509 509 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
510 510 raise error.Abort(_('push failed: %s') % err.args[1])
511 511 raise error.Abort(err.args[1])
512 512 finally:
513 513 fp.close()
514 514 os.unlink(tempname)
515 515
516 516 def _calltwowaystream(self, cmd, fp, **args):
517 517 fh = None
518 518 fp_ = None
519 519 filename = None
520 520 try:
521 521 # dump bundle to disk
522 522 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
523 523 fh = os.fdopen(fd, r"wb")
524 524 d = fp.read(4096)
525 525 while d:
526 526 fh.write(d)
527 527 d = fp.read(4096)
528 528 fh.close()
529 529 # start http push
530 530 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
531 531 headers = {r'Content-Type': r'application/mercurial-0.1'}
532 532 return self._callstream(cmd, data=fp_, headers=headers, **args)
533 533 finally:
534 534 if fp_ is not None:
535 535 fp_.close()
536 536 if fh is not None:
537 537 fh.close()
538 538 os.unlink(filename)
539 539
540 540 def _callcompressable(self, cmd, **args):
541 541 return self._callstream(cmd, _compressible=True, **args)
542 542
543 543 def _abort(self, exception):
544 544 raise exception
545 545
546 546 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
547 547 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
548 548 buffersends=True)
549 549
550 550 handler = wireprotov2peer.clienthandler(ui, reactor)
551 551
552 552 url = '%s/%s' % (apiurl, permission)
553 553
554 554 if len(requests) > 1:
555 555 url += '/multirequest'
556 556 else:
557 557 url += '/%s' % requests[0][0]
558 558
559 559 for command, args, f in requests:
560 560 assert not list(handler.callcommand(command, args, f))
561 561
562 562 # TODO stream this.
563 563 body = b''.join(map(bytes, handler.flushcommands()))
564 564
565 565 # TODO modify user-agent to reflect v2
566 566 headers = {
567 567 r'Accept': wireprotov2server.FRAMINGTYPE,
568 568 r'Content-Type': wireprotov2server.FRAMINGTYPE,
569 569 }
570 570
571 571 req = requestbuilder(pycompat.strurl(url), body, headers)
572 572 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
573 573
574 574 try:
575 575 res = opener.open(req)
576 576 except urlerr.httperror as e:
577 577 if e.code == 401:
578 578 raise error.Abort(_('authorization failed'))
579 579
580 580 raise
581 581 except httplib.HTTPException as e:
582 582 ui.traceback()
583 583 raise IOError(None, e)
584 584
585 585 return handler, res
586 586
587 587 class queuedcommandfuture(pycompat.futures.Future):
588 588 """Wraps result() on command futures to trigger submission on call."""
589 589
590 590 def result(self, timeout=None):
591 591 if self.done():
592 592 return pycompat.futures.Future.result(self, timeout)
593 593
594 594 self._peerexecutor.sendcommands()
595 595
596 596 # sendcommands() will restore the original __class__ and self.result
597 597 # will resolve to Future.result.
598 598 return self.result(timeout)
599 599
600 600 @interfaceutil.implementer(repository.ipeercommandexecutor)
601 601 class httpv2executor(object):
602 602 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
603 603 self._ui = ui
604 604 self._opener = opener
605 605 self._requestbuilder = requestbuilder
606 606 self._apiurl = apiurl
607 607 self._descriptor = descriptor
608 608 self._sent = False
609 609 self._closed = False
610 610 self._neededpermissions = set()
611 611 self._calls = []
612 612 self._futures = weakref.WeakSet()
613 613 self._responseexecutor = None
614 614 self._responsef = None
615 615
616 616 def __enter__(self):
617 617 return self
618 618
619 619 def __exit__(self, exctype, excvalue, exctb):
620 620 self.close()
621 621
622 622 def callcommand(self, command, args):
623 623 if self._sent:
624 624 raise error.ProgrammingError('callcommand() cannot be used after '
625 625 'commands are sent')
626 626
627 627 if self._closed:
628 628 raise error.ProgrammingError('callcommand() cannot be used after '
629 629 'close()')
630 630
631 631 # The service advertises which commands are available. So if we attempt
632 632 # to call an unknown command or pass an unknown argument, we can screen
633 633 # for this.
634 634 if command not in self._descriptor['commands']:
635 635 raise error.ProgrammingError(
636 636 'wire protocol command %s is not available' % command)
637 637
638 638 cmdinfo = self._descriptor['commands'][command]
639 639 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
640 640
641 641 if unknownargs:
642 642 raise error.ProgrammingError(
643 643 'wire protocol command %s does not accept argument: %s' % (
644 644 command, ', '.join(sorted(unknownargs))))
645 645
646 646 self._neededpermissions |= set(cmdinfo['permissions'])
647 647
648 648 # TODO we /could/ also validate types here, since the API descriptor
649 649 # includes types...
650 650
651 651 f = pycompat.futures.Future()
652 652
653 653 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
654 654 # could deadlock.
655 655 f.__class__ = queuedcommandfuture
656 656 f._peerexecutor = self
657 657
658 658 self._futures.add(f)
659 659 self._calls.append((command, args, f))
660 660
661 661 return f
662 662
663 663 def sendcommands(self):
664 664 if self._sent:
665 665 return
666 666
667 667 if not self._calls:
668 668 return
669 669
670 670 self._sent = True
671 671
672 672 # Unhack any future types so caller sees a clean type and so we
673 673 # break reference cycle.
674 674 for f in self._futures:
675 675 if isinstance(f, queuedcommandfuture):
676 676 f.__class__ = pycompat.futures.Future
677 677 f._peerexecutor = None
678 678
679 679 # Mark the future as running and filter out cancelled futures.
680 680 calls = [(command, args, f)
681 681 for command, args, f in self._calls
682 682 if f.set_running_or_notify_cancel()]
683 683
684 684 # Clear out references, prevent improper object usage.
685 685 self._calls = None
686 686
687 687 if not calls:
688 688 return
689 689
690 690 permissions = set(self._neededpermissions)
691 691
692 692 if 'push' in permissions and 'pull' in permissions:
693 693 permissions.remove('pull')
694 694
695 695 if len(permissions) > 1:
696 696 raise error.RepoError(_('cannot make request requiring multiple '
697 697 'permissions: %s') %
698 698 _(', ').join(sorted(permissions)))
699 699
700 700 permission = {
701 701 'push': 'rw',
702 702 'pull': 'ro',
703 703 }[permissions.pop()]
704 704
705 705 handler, resp = sendv2request(
706 706 self._ui, self._opener, self._requestbuilder, self._apiurl,
707 707 permission, calls)
708 708
709 709 # TODO we probably want to validate the HTTP code, media type, etc.
710 710
711 711 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
712 712 self._responsef = self._responseexecutor.submit(self._handleresponse,
713 713 handler, resp)
714 714
715 715 def close(self):
716 716 if self._closed:
717 717 return
718 718
719 719 self.sendcommands()
720 720
721 721 self._closed = True
722 722
723 723 if not self._responsef:
724 724 return
725 725
726 726 try:
727 727 self._responsef.result()
728 728 finally:
729 729 self._responseexecutor.shutdown(wait=True)
730 730 self._responsef = None
731 731 self._responseexecutor = None
732 732
733 733 # If any of our futures are still in progress, mark them as
734 734 # errored, otherwise a result() could wait indefinitely.
735 735 for f in self._futures:
736 736 if not f.done():
737 737 f.set_exception(error.ResponseError(
738 738 _('unfulfilled command response')))
739 739
740 740 self._futures = None
741 741
742 742 def _handleresponse(self, handler, resp):
743 743 # Called in a thread to read the response.
744 744
745 745 while handler.readframe(resp):
746 746 pass
747 747
748 748 # TODO implement interface for version 2 peers
749 749 @interfaceutil.implementer(repository.ipeerconnection,
750 750 repository.ipeercapabilities,
751 751 repository.ipeerrequests)
752 752 class httpv2peer(object):
753 753 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
754 754 apidescriptor):
755 755 self.ui = ui
756 756
757 757 if repourl.endswith('/'):
758 758 repourl = repourl[:-1]
759 759
760 760 self._url = repourl
761 761 self._apipath = apipath
762 762 self._apiurl = '%s/%s' % (repourl, apipath)
763 763 self._opener = opener
764 764 self._requestbuilder = requestbuilder
765 765 self._descriptor = apidescriptor
766 766
767 767 # Start of ipeerconnection.
768 768
769 769 def url(self):
770 770 return self._url
771 771
772 772 def local(self):
773 773 return None
774 774
775 775 def peer(self):
776 776 return self
777 777
778 778 def canpush(self):
779 779 # TODO change once implemented.
780 780 return False
781 781
782 782 def close(self):
783 783 pass
784 784
785 785 # End of ipeerconnection.
786 786
787 787 # Start of ipeercapabilities.
788 788
789 789 def capable(self, name):
790 790 # The capabilities used internally historically map to capabilities
791 791 # advertised from the "capabilities" wire protocol command. However,
792 792 # version 2 of that command works differently.
793 793
794 794 # Maps to commands that are available.
795 795 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
796 796 return True
797 797
798 798 # Other concepts.
799 799 if name in ('bundle2',):
800 800 return True
801 801
802 802 return False
803 803
804 804 def requirecap(self, name, purpose):
805 805 if self.capable(name):
806 806 return
807 807
808 808 raise error.CapabilityError(
809 809 _('cannot %s; client or remote repository does not support the %r '
810 810 'capability') % (purpose, name))
811 811
812 812 # End of ipeercapabilities.
813 813
814 814 def _call(self, name, **args):
815 815 with self.commandexecutor() as e:
816 816 return e.callcommand(name, args).result()
817 817
818 818 def commandexecutor(self):
819 819 return httpv2executor(self.ui, self._opener, self._requestbuilder,
820 820 self._apiurl, self._descriptor)
821 821
822 822 # Registry of API service names to metadata about peers that handle it.
823 823 #
824 824 # The following keys are meaningful:
825 825 #
826 826 # init
827 827 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
828 828 # apidescriptor) to create a peer.
829 829 #
830 830 # priority
831 831 # Integer priority for the service. If we could choose from multiple
832 832 # services, we choose the one with the highest priority.
833 833 API_PEERS = {
834 834 wireprototypes.HTTP_WIREPROTO_V2: {
835 835 'init': httpv2peer,
836 836 'priority': 50,
837 837 },
838 838 }
839 839
840 840 def performhandshake(ui, url, opener, requestbuilder):
841 841 # The handshake is a request to the capabilities command.
842 842
843 843 caps = None
844 844 def capable(x):
845 845 raise error.ProgrammingError('should not be called')
846 846
847 847 args = {}
848 848
849 849 # The client advertises support for newer protocols by adding an
850 850 # X-HgUpgrade-* header with a list of supported APIs and an
851 851 # X-HgProto-* header advertising which serializing formats it supports.
852 852 # We only support the HTTP version 2 transport and CBOR responses for
853 853 # now.
854 854 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
855 855
856 856 if advertisev2:
857 857 args['headers'] = {
858 858 r'X-HgProto-1': r'cbor',
859 859 }
860 860
861 861 args['headers'].update(
862 862 encodevalueinheaders(' '.join(sorted(API_PEERS)),
863 863 'X-HgUpgrade',
864 864 # We don't know the header limit this early.
865 865 # So make it small.
866 866 1024))
867 867
868 868 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
869 869 capable, url, 'capabilities',
870 870 args)
871 871 resp = sendrequest(ui, opener, req)
872 872
873 873 # The server may redirect us to the repo root, stripping the
874 874 # ?cmd=capabilities query string from the URL. The server would likely
875 875 # return HTML in this case and ``parsev1commandresponse()`` would raise.
876 876 # We catch this special case and re-issue the capabilities request against
877 877 # the new URL.
878 878 #
879 879 # We should ideally not do this, as a redirect that drops the query
880 880 # string from the URL is arguably a server bug. (Garbage in, garbage out).
881 881 # However, Mercurial clients for several years appeared to handle this
882 882 # issue without behavior degradation. And according to issue 5860, it may
883 883 # be a longstanding bug in some server implementations. So we allow a
884 884 # redirect that drops the query string to "just work."
885 885 try:
886 886 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
887 887 compressible=False,
888 888 allowcbor=advertisev2)
889 889 except RedirectedRepoError as e:
890 890 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
891 891 capable, e.respurl,
892 892 'capabilities', args)
893 893 resp = sendrequest(ui, opener, req)
894 894 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
895 895 compressible=False,
896 896 allowcbor=advertisev2)
897 897
898 898 try:
899 899 rawdata = resp.read()
900 900 finally:
901 901 resp.close()
902 902
903 903 if not ct.startswith('application/mercurial-'):
904 904 raise error.ProgrammingError('unexpected content-type: %s' % ct)
905 905
906 906 if advertisev2:
907 907 if ct == 'application/mercurial-cbor':
908 908 try:
909 909 info = cbor.loads(rawdata)
910 910 except cbor.CBORDecodeError:
911 911 raise error.Abort(_('error decoding CBOR from remote server'),
912 912 hint=_('try again and consider contacting '
913 913 'the server operator'))
914 914
915 915 # We got a legacy response. That's fine.
916 916 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
917 917 info = {
918 918 'v1capabilities': set(rawdata.split())
919 919 }
920 920
921 921 else:
922 922 raise error.RepoError(
923 923 _('unexpected response type from server: %s') % ct)
924 924 else:
925 925 info = {
926 926 'v1capabilities': set(rawdata.split())
927 927 }
928 928
929 929 return respurl, info
930 930
931 931 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
932 932 """Construct an appropriate HTTP peer instance.
933 933
934 934 ``opener`` is an ``url.opener`` that should be used to establish
935 935 connections, perform HTTP requests.
936 936
937 937 ``requestbuilder`` is the type used for constructing HTTP requests.
938 938 It exists as an argument so extensions can override the default.
939 939 """
940 940 u = util.url(path)
941 941 if u.query or u.fragment:
942 942 raise error.Abort(_('unsupported URL component: "%s"') %
943 943 (u.query or u.fragment))
944 944
945 945 # urllib cannot handle URLs with embedded user or passwd.
946 946 url, authinfo = u.authinfo()
947 947 ui.debug('using %s\n' % url)
948 948
949 949 opener = opener or urlmod.opener(ui, authinfo)
950 950
951 951 respurl, info = performhandshake(ui, url, opener, requestbuilder)
952 952
953 953 # Given the intersection of APIs that both we and the server support,
954 954 # sort by their advertised priority and pick the first one.
955 955 #
956 956 # TODO consider making this request-based and interface driven. For
957 957 # example, the caller could say "I want a peer that does X." It's quite
958 958 # possible that not all peers would do that. Since we know the service
959 959 # capabilities, we could filter out services not meeting the
960 960 # requirements. Possibly by consulting the interfaces defined by the
961 961 # peer type.
962 962 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
963 963
964 964 preferredchoices = sorted(apipeerchoices,
965 965 key=lambda x: API_PEERS[x]['priority'],
966 966 reverse=True)
967 967
968 968 for service in preferredchoices:
969 969 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
970 970
971 971 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
972 972 requestbuilder,
973 973 info['apis'][service])
974 974
975 975 # Failed to construct an API peer. Fall back to legacy.
976 976 return httppeer(ui, path, respurl, opener, requestbuilder,
977 977 info['v1capabilities'])
978 978
979 979 def instance(ui, path, create, intents=None):
980 980 if create:
981 981 raise error.Abort(_('cannot create new http repository'))
982 982 try:
983 983 if path.startswith('https:') and not urlmod.has_https:
984 984 raise error.Abort(_('Python support for SSL and HTTPS '
985 985 'is not installed'))
986 986
987 987 inst = makepeer(ui, path)
988 988
989 989 return inst
990 990 except error.RepoError as httpexception:
991 991 try:
992 992 r = statichttprepo.instance(ui, "static-" + path, create)
993 993 ui.note(_('(falling back to static-http)\n'))
994 994 return r
995 995 except error.RepoError:
996 996 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now