##// END OF EJS Templates
httppeer: use our CBOR decoder...
Gregory Szorc -
r39476:cdb56f29 default
parent child Browse files
Show More
@@ -1,1003 +1,1001 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 from .thirdparty import (
20 cbor,
21 )
22 19 from . import (
23 20 bundle2,
24 21 error,
25 22 httpconnection,
26 23 pycompat,
27 24 repository,
28 25 statichttprepo,
29 26 url as urlmod,
30 27 util,
31 28 wireprotoframing,
32 29 wireprototypes,
33 30 wireprotov1peer,
34 31 wireprotov2peer,
35 32 wireprotov2server,
36 33 )
37 34 from .utils import (
35 cborutil,
38 36 interfaceutil,
39 37 stringutil,
40 38 )
41 39
42 40 httplib = util.httplib
43 41 urlerr = util.urlerr
44 42 urlreq = util.urlreq
45 43
46 44 def encodevalueinheaders(value, header, limit):
47 45 """Encode a string value into multiple HTTP headers.
48 46
49 47 ``value`` will be encoded into 1 or more HTTP headers with the names
50 48 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 49 name + value will be at most ``limit`` bytes long.
52 50
53 51 Returns an iterable of 2-tuples consisting of header names and
54 52 values as native strings.
55 53 """
56 54 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 55 # not bytes. This function always takes bytes in as arguments.
58 56 fmt = pycompat.strurl(header) + r'-%s'
59 57 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 58 # and not a unicode: we're just getting the encoded length anyway,
61 59 # and using an r-string to make it portable between Python 2 and 3
62 60 # doesn't work because then the \r is a literal backslash-r
63 61 # instead of a carriage return.
64 62 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 63 result = []
66 64
67 65 n = 0
68 66 for i in pycompat.xrange(0, len(value), valuelen):
69 67 n += 1
70 68 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 69
72 70 return result
73 71
74 72 def _wraphttpresponse(resp):
75 73 """Wrap an HTTPResponse with common error handlers.
76 74
77 75 This ensures that any I/O from any consumer raises the appropriate
78 76 error and messaging.
79 77 """
80 78 origread = resp.read
81 79
82 80 class readerproxy(resp.__class__):
83 81 def read(self, size=None):
84 82 try:
85 83 return origread(size)
86 84 except httplib.IncompleteRead as e:
87 85 # e.expected is an integer if length known or None otherwise.
88 86 if e.expected:
89 87 msg = _('HTTP request error (incomplete response; '
90 88 'expected %d bytes got %d)') % (e.expected,
91 89 len(e.partial))
92 90 else:
93 91 msg = _('HTTP request error (incomplete response)')
94 92
95 93 raise error.PeerTransportError(
96 94 msg,
97 95 hint=_('this may be an intermittent network failure; '
98 96 'if the error persists, consider contacting the '
99 97 'network or server operator'))
100 98 except httplib.HTTPException as e:
101 99 raise error.PeerTransportError(
102 100 _('HTTP request error (%s)') % e,
103 101 hint=_('this may be an intermittent network failure; '
104 102 'if the error persists, consider contacting the '
105 103 'network or server operator'))
106 104
107 105 resp.__class__ = readerproxy
108 106
109 107 class _multifile(object):
110 108 def __init__(self, *fileobjs):
111 109 for f in fileobjs:
112 110 if not util.safehasattr(f, 'length'):
113 111 raise ValueError(
114 112 '_multifile only supports file objects that '
115 113 'have a length but this one does not:', type(f), f)
116 114 self._fileobjs = fileobjs
117 115 self._index = 0
118 116
119 117 @property
120 118 def length(self):
121 119 return sum(f.length for f in self._fileobjs)
122 120
123 121 def read(self, amt=None):
124 122 if amt <= 0:
125 123 return ''.join(f.read() for f in self._fileobjs)
126 124 parts = []
127 125 while amt and self._index < len(self._fileobjs):
128 126 parts.append(self._fileobjs[self._index].read(amt))
129 127 got = len(parts[-1])
130 128 if got < amt:
131 129 self._index += 1
132 130 amt -= got
133 131 return ''.join(parts)
134 132
135 133 def seek(self, offset, whence=os.SEEK_SET):
136 134 if whence != os.SEEK_SET:
137 135 raise NotImplementedError(
138 136 '_multifile does not support anything other'
139 137 ' than os.SEEK_SET for whence on seek()')
140 138 if offset != 0:
141 139 raise NotImplementedError(
142 140 '_multifile only supports seeking to start, but that '
143 141 'could be fixed if you need it')
144 142 for f in self._fileobjs:
145 143 f.seek(0)
146 144 self._index = 0
147 145
148 146 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 147 repobaseurl, cmd, args):
150 148 """Make an HTTP request to run a command for a version 1 client.
151 149
152 150 ``caps`` is a set of known server capabilities. The value may be
153 151 None if capabilities are not yet known.
154 152
155 153 ``capablefn`` is a function to evaluate a capability.
156 154
157 155 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 156 raw data to pass to it.
159 157 """
160 158 if cmd == 'pushkey':
161 159 args['data'] = ''
162 160 data = args.pop('data', None)
163 161 headers = args.pop('headers', {})
164 162
165 163 ui.debug("sending %s command\n" % cmd)
166 164 q = [('cmd', cmd)]
167 165 headersize = 0
168 166 # Important: don't use self.capable() here or else you end up
169 167 # with infinite recursion when trying to look up capabilities
170 168 # for the first time.
171 169 postargsok = caps is not None and 'httppostargs' in caps
172 170
173 171 # Send arguments via POST.
174 172 if postargsok and args:
175 173 strargs = urlreq.urlencode(sorted(args.items()))
176 174 if not data:
177 175 data = strargs
178 176 else:
179 177 if isinstance(data, bytes):
180 178 i = io.BytesIO(data)
181 179 i.length = len(data)
182 180 data = i
183 181 argsio = io.BytesIO(strargs)
184 182 argsio.length = len(strargs)
185 183 data = _multifile(argsio, data)
186 184 headers[r'X-HgArgs-Post'] = len(strargs)
187 185 elif args:
188 186 # Calling self.capable() can infinite loop if we are calling
189 187 # "capabilities". But that command should never accept wire
190 188 # protocol arguments. So this should never happen.
191 189 assert cmd != 'capabilities'
192 190 httpheader = capablefn('httpheader')
193 191 if httpheader:
194 192 headersize = int(httpheader.split(',', 1)[0])
195 193
196 194 # Send arguments via HTTP headers.
197 195 if headersize > 0:
198 196 # The headers can typically carry more data than the URL.
199 197 encargs = urlreq.urlencode(sorted(args.items()))
200 198 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 199 headersize):
202 200 headers[header] = value
203 201 # Send arguments via query string (Mercurial <1.9).
204 202 else:
205 203 q += sorted(args.items())
206 204
207 205 qs = '?%s' % urlreq.urlencode(q)
208 206 cu = "%s%s" % (repobaseurl, qs)
209 207 size = 0
210 208 if util.safehasattr(data, 'length'):
211 209 size = data.length
212 210 elif data is not None:
213 211 size = len(data)
214 212 if data is not None and r'Content-Type' not in headers:
215 213 headers[r'Content-Type'] = r'application/mercurial-0.1'
216 214
217 215 # Tell the server we accept application/mercurial-0.2 and multiple
218 216 # compression formats if the server is capable of emitting those
219 217 # payloads.
220 218 # Note: Keep this set empty by default, as client advertisement of
221 219 # protocol parameters should only occur after the handshake.
222 220 protoparams = set()
223 221
224 222 mediatypes = set()
225 223 if caps is not None:
226 224 mt = capablefn('httpmediatype')
227 225 if mt:
228 226 protoparams.add('0.1')
229 227 mediatypes = set(mt.split(','))
230 228
231 229 protoparams.add('partial-pull')
232 230
233 231 if '0.2tx' in mediatypes:
234 232 protoparams.add('0.2')
235 233
236 234 if '0.2tx' in mediatypes and capablefn('compression'):
237 235 # We /could/ compare supported compression formats and prune
238 236 # non-mutually supported or error if nothing is mutually supported.
239 237 # For now, send the full list to the server and have it error.
240 238 comps = [e.wireprotosupport().name for e in
241 239 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 240 protoparams.add('comp=%s' % ','.join(comps))
243 241
244 242 if protoparams:
245 243 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 244 'X-HgProto',
247 245 headersize or 1024)
248 246 for header, value in protoheaders:
249 247 headers[header] = value
250 248
251 249 varyheaders = []
252 250 for header in headers:
253 251 if header.lower().startswith(r'x-hg'):
254 252 varyheaders.append(header)
255 253
256 254 if varyheaders:
257 255 headers[r'Vary'] = r','.join(sorted(varyheaders))
258 256
259 257 req = requestbuilder(pycompat.strurl(cu), data, headers)
260 258
261 259 if data is not None:
262 260 ui.debug("sending %d bytes\n" % size)
263 261 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264 262
265 263 return req, cu, qs
266 264
267 265 def _reqdata(req):
268 266 """Get request data, if any. If no data, returns None."""
269 267 if pycompat.ispy3:
270 268 return req.data
271 269 if not req.has_data():
272 270 return None
273 271 return req.get_data()
274 272
275 273 def sendrequest(ui, opener, req):
276 274 """Send a prepared HTTP request.
277 275
278 276 Returns the response object.
279 277 """
280 278 dbg = ui.debug
281 279 if (ui.debugflag
282 280 and ui.configbool('devel', 'debug.peer-request')):
283 281 line = 'devel-peer-request: %s\n'
284 282 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
285 283 pycompat.bytesurl(req.get_full_url())))
286 284 hgargssize = None
287 285
288 286 for header, value in sorted(req.header_items()):
289 287 header = pycompat.bytesurl(header)
290 288 value = pycompat.bytesurl(value)
291 289 if header.startswith('X-hgarg-'):
292 290 if hgargssize is None:
293 291 hgargssize = 0
294 292 hgargssize += len(value)
295 293 else:
296 294 dbg(line % ' %s %s' % (header, value))
297 295
298 296 if hgargssize is not None:
299 297 dbg(line % ' %d bytes of commands arguments in headers'
300 298 % hgargssize)
301 299 data = _reqdata(req)
302 300 if data is not None:
303 301 length = getattr(data, 'length', None)
304 302 if length is None:
305 303 length = len(data)
306 304 dbg(line % ' %d bytes of data' % length)
307 305
308 306 start = util.timer()
309 307
310 308 res = None
311 309 try:
312 310 res = opener.open(req)
313 311 except urlerr.httperror as inst:
314 312 if inst.code == 401:
315 313 raise error.Abort(_('authorization failed'))
316 314 raise
317 315 except httplib.HTTPException as inst:
318 316 ui.debug('http error requesting %s\n' %
319 317 util.hidepassword(req.get_full_url()))
320 318 ui.traceback()
321 319 raise IOError(None, inst)
322 320 finally:
323 321 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
324 322 code = res.code if res else -1
325 323 dbg(line % ' finished in %.4f seconds (%d)'
326 324 % (util.timer() - start, code))
327 325
328 326 # Insert error handlers for common I/O failures.
329 327 _wraphttpresponse(res)
330 328
331 329 return res
332 330
333 331 class RedirectedRepoError(error.RepoError):
334 332 def __init__(self, msg, respurl):
335 333 super(RedirectedRepoError, self).__init__(msg)
336 334 self.respurl = respurl
337 335
338 336 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
339 337 allowcbor=False):
340 338 # record the url we got redirected to
341 339 redirected = False
342 340 respurl = pycompat.bytesurl(resp.geturl())
343 341 if respurl.endswith(qs):
344 342 respurl = respurl[:-len(qs)]
345 343 qsdropped = False
346 344 else:
347 345 qsdropped = True
348 346
349 347 if baseurl.rstrip('/') != respurl.rstrip('/'):
350 348 redirected = True
351 349 if not ui.quiet:
352 350 ui.warn(_('real URL is %s\n') % respurl)
353 351
354 352 try:
355 353 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
356 354 except AttributeError:
357 355 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
358 356
359 357 safeurl = util.hidepassword(baseurl)
360 358 if proto.startswith('application/hg-error'):
361 359 raise error.OutOfBandError(resp.read())
362 360
363 361 # Pre 1.0 versions of Mercurial used text/plain and
364 362 # application/hg-changegroup. We don't support such old servers.
365 363 if not proto.startswith('application/mercurial-'):
366 364 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
367 365 msg = _("'%s' does not appear to be an hg repository:\n"
368 366 "---%%<--- (%s)\n%s\n---%%<---\n") % (
369 367 safeurl, proto or 'no content-type', resp.read(1024))
370 368
371 369 # Some servers may strip the query string from the redirect. We
372 370 # raise a special error type so callers can react to this specially.
373 371 if redirected and qsdropped:
374 372 raise RedirectedRepoError(msg, respurl)
375 373 else:
376 374 raise error.RepoError(msg)
377 375
378 376 try:
379 377 subtype = proto.split('-', 1)[1]
380 378
381 379 # Unless we end up supporting CBOR in the legacy wire protocol,
382 380 # this should ONLY be encountered for the initial capabilities
383 381 # request during handshake.
384 382 if subtype == 'cbor':
385 383 if allowcbor:
386 384 return respurl, proto, resp
387 385 else:
388 386 raise error.RepoError(_('unexpected CBOR response from '
389 387 'server'))
390 388
391 389 version_info = tuple([int(n) for n in subtype.split('.')])
392 390 except ValueError:
393 391 raise error.RepoError(_("'%s' sent a broken Content-Type "
394 392 "header (%s)") % (safeurl, proto))
395 393
396 394 # TODO consider switching to a decompression reader that uses
397 395 # generators.
398 396 if version_info == (0, 1):
399 397 if compressible:
400 398 resp = util.compengines['zlib'].decompressorreader(resp)
401 399
402 400 elif version_info == (0, 2):
403 401 # application/mercurial-0.2 always identifies the compression
404 402 # engine in the payload header.
405 403 elen = struct.unpack('B', resp.read(1))[0]
406 404 ename = resp.read(elen)
407 405 engine = util.compengines.forwiretype(ename)
408 406
409 407 resp = engine.decompressorreader(resp)
410 408 else:
411 409 raise error.RepoError(_("'%s' uses newer protocol %s") %
412 410 (safeurl, subtype))
413 411
414 412 return respurl, proto, resp
415 413
416 414 class httppeer(wireprotov1peer.wirepeer):
417 415 def __init__(self, ui, path, url, opener, requestbuilder, caps):
418 416 self.ui = ui
419 417 self._path = path
420 418 self._url = url
421 419 self._caps = caps
422 420 self._urlopener = opener
423 421 self._requestbuilder = requestbuilder
424 422
425 423 def __del__(self):
426 424 for h in self._urlopener.handlers:
427 425 h.close()
428 426 getattr(h, "close_all", lambda: None)()
429 427
430 428 # Begin of ipeerconnection interface.
431 429
432 430 def url(self):
433 431 return self._path
434 432
435 433 def local(self):
436 434 return None
437 435
438 436 def peer(self):
439 437 return self
440 438
441 439 def canpush(self):
442 440 return True
443 441
444 442 def close(self):
445 443 pass
446 444
447 445 # End of ipeerconnection interface.
448 446
449 447 # Begin of ipeercommands interface.
450 448
451 449 def capabilities(self):
452 450 return self._caps
453 451
454 452 # End of ipeercommands interface.
455 453
456 454 def _callstream(self, cmd, _compressible=False, **args):
457 455 args = pycompat.byteskwargs(args)
458 456
459 457 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
460 458 self._caps, self.capable,
461 459 self._url, cmd, args)
462 460
463 461 resp = sendrequest(self.ui, self._urlopener, req)
464 462
465 463 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
466 464 resp, _compressible)
467 465
468 466 return resp
469 467
470 468 def _call(self, cmd, **args):
471 469 fp = self._callstream(cmd, **args)
472 470 try:
473 471 return fp.read()
474 472 finally:
475 473 # if using keepalive, allow connection to be reused
476 474 fp.close()
477 475
478 476 def _callpush(self, cmd, cg, **args):
479 477 # have to stream bundle to a temp file because we do not have
480 478 # http 1.1 chunked transfer.
481 479
482 480 types = self.capable('unbundle')
483 481 try:
484 482 types = types.split(',')
485 483 except AttributeError:
486 484 # servers older than d1b16a746db6 will send 'unbundle' as a
487 485 # boolean capability. They only support headerless/uncompressed
488 486 # bundles.
489 487 types = [""]
490 488 for x in types:
491 489 if x in bundle2.bundletypes:
492 490 type = x
493 491 break
494 492
495 493 tempname = bundle2.writebundle(self.ui, cg, None, type)
496 494 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
497 495 headers = {r'Content-Type': r'application/mercurial-0.1'}
498 496
499 497 try:
500 498 r = self._call(cmd, data=fp, headers=headers, **args)
501 499 vals = r.split('\n', 1)
502 500 if len(vals) < 2:
503 501 raise error.ResponseError(_("unexpected response:"), r)
504 502 return vals
505 503 except urlerr.httperror:
506 504 # Catch and re-raise these so we don't try and treat them
507 505 # like generic socket errors. They lack any values in
508 506 # .args on Python 3 which breaks our socket.error block.
509 507 raise
510 508 except socket.error as err:
511 509 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
512 510 raise error.Abort(_('push failed: %s') % err.args[1])
513 511 raise error.Abort(err.args[1])
514 512 finally:
515 513 fp.close()
516 514 os.unlink(tempname)
517 515
518 516 def _calltwowaystream(self, cmd, fp, **args):
519 517 fh = None
520 518 fp_ = None
521 519 filename = None
522 520 try:
523 521 # dump bundle to disk
524 522 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
525 523 fh = os.fdopen(fd, r"wb")
526 524 d = fp.read(4096)
527 525 while d:
528 526 fh.write(d)
529 527 d = fp.read(4096)
530 528 fh.close()
531 529 # start http push
532 530 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
533 531 headers = {r'Content-Type': r'application/mercurial-0.1'}
534 532 return self._callstream(cmd, data=fp_, headers=headers, **args)
535 533 finally:
536 534 if fp_ is not None:
537 535 fp_.close()
538 536 if fh is not None:
539 537 fh.close()
540 538 os.unlink(filename)
541 539
542 540 def _callcompressable(self, cmd, **args):
543 541 return self._callstream(cmd, _compressible=True, **args)
544 542
545 543 def _abort(self, exception):
546 544 raise exception
547 545
548 546 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
549 547 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
550 548 buffersends=True)
551 549
552 550 handler = wireprotov2peer.clienthandler(ui, reactor)
553 551
554 552 url = '%s/%s' % (apiurl, permission)
555 553
556 554 if len(requests) > 1:
557 555 url += '/multirequest'
558 556 else:
559 557 url += '/%s' % requests[0][0]
560 558
561 559 ui.debug('sending %d commands\n' % len(requests))
562 560 for command, args, f in requests:
563 561 ui.debug('sending command %s: %s\n' % (
564 562 command, stringutil.pprint(args, indent=2)))
565 563 assert not list(handler.callcommand(command, args, f))
566 564
567 565 # TODO stream this.
568 566 body = b''.join(map(bytes, handler.flushcommands()))
569 567
570 568 # TODO modify user-agent to reflect v2
571 569 headers = {
572 570 r'Accept': wireprotov2server.FRAMINGTYPE,
573 571 r'Content-Type': wireprotov2server.FRAMINGTYPE,
574 572 }
575 573
576 574 req = requestbuilder(pycompat.strurl(url), body, headers)
577 575 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
578 576
579 577 try:
580 578 res = opener.open(req)
581 579 except urlerr.httperror as e:
582 580 if e.code == 401:
583 581 raise error.Abort(_('authorization failed'))
584 582
585 583 raise
586 584 except httplib.HTTPException as e:
587 585 ui.traceback()
588 586 raise IOError(None, e)
589 587
590 588 return handler, res
591 589
592 590 class queuedcommandfuture(pycompat.futures.Future):
593 591 """Wraps result() on command futures to trigger submission on call."""
594 592
595 593 def result(self, timeout=None):
596 594 if self.done():
597 595 return pycompat.futures.Future.result(self, timeout)
598 596
599 597 self._peerexecutor.sendcommands()
600 598
601 599 # sendcommands() will restore the original __class__ and self.result
602 600 # will resolve to Future.result.
603 601 return self.result(timeout)
604 602
605 603 @interfaceutil.implementer(repository.ipeercommandexecutor)
606 604 class httpv2executor(object):
607 605 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
608 606 self._ui = ui
609 607 self._opener = opener
610 608 self._requestbuilder = requestbuilder
611 609 self._apiurl = apiurl
612 610 self._descriptor = descriptor
613 611 self._sent = False
614 612 self._closed = False
615 613 self._neededpermissions = set()
616 614 self._calls = []
617 615 self._futures = weakref.WeakSet()
618 616 self._responseexecutor = None
619 617 self._responsef = None
620 618
621 619 def __enter__(self):
622 620 return self
623 621
624 622 def __exit__(self, exctype, excvalue, exctb):
625 623 self.close()
626 624
627 625 def callcommand(self, command, args):
628 626 if self._sent:
629 627 raise error.ProgrammingError('callcommand() cannot be used after '
630 628 'commands are sent')
631 629
632 630 if self._closed:
633 631 raise error.ProgrammingError('callcommand() cannot be used after '
634 632 'close()')
635 633
636 634 # The service advertises which commands are available. So if we attempt
637 635 # to call an unknown command or pass an unknown argument, we can screen
638 636 # for this.
639 637 if command not in self._descriptor['commands']:
640 638 raise error.ProgrammingError(
641 639 'wire protocol command %s is not available' % command)
642 640
643 641 cmdinfo = self._descriptor['commands'][command]
644 642 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
645 643
646 644 if unknownargs:
647 645 raise error.ProgrammingError(
648 646 'wire protocol command %s does not accept argument: %s' % (
649 647 command, ', '.join(sorted(unknownargs))))
650 648
651 649 self._neededpermissions |= set(cmdinfo['permissions'])
652 650
653 651 # TODO we /could/ also validate types here, since the API descriptor
654 652 # includes types...
655 653
656 654 f = pycompat.futures.Future()
657 655
658 656 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
659 657 # could deadlock.
660 658 f.__class__ = queuedcommandfuture
661 659 f._peerexecutor = self
662 660
663 661 self._futures.add(f)
664 662 self._calls.append((command, args, f))
665 663
666 664 return f
667 665
668 666 def sendcommands(self):
669 667 if self._sent:
670 668 return
671 669
672 670 if not self._calls:
673 671 return
674 672
675 673 self._sent = True
676 674
677 675 # Unhack any future types so caller sees a clean type and so we
678 676 # break reference cycle.
679 677 for f in self._futures:
680 678 if isinstance(f, queuedcommandfuture):
681 679 f.__class__ = pycompat.futures.Future
682 680 f._peerexecutor = None
683 681
684 682 # Mark the future as running and filter out cancelled futures.
685 683 calls = [(command, args, f)
686 684 for command, args, f in self._calls
687 685 if f.set_running_or_notify_cancel()]
688 686
689 687 # Clear out references, prevent improper object usage.
690 688 self._calls = None
691 689
692 690 if not calls:
693 691 return
694 692
695 693 permissions = set(self._neededpermissions)
696 694
697 695 if 'push' in permissions and 'pull' in permissions:
698 696 permissions.remove('pull')
699 697
700 698 if len(permissions) > 1:
701 699 raise error.RepoError(_('cannot make request requiring multiple '
702 700 'permissions: %s') %
703 701 _(', ').join(sorted(permissions)))
704 702
705 703 permission = {
706 704 'push': 'rw',
707 705 'pull': 'ro',
708 706 }[permissions.pop()]
709 707
710 708 handler, resp = sendv2request(
711 709 self._ui, self._opener, self._requestbuilder, self._apiurl,
712 710 permission, calls)
713 711
714 712 # TODO we probably want to validate the HTTP code, media type, etc.
715 713
716 714 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
717 715 self._responsef = self._responseexecutor.submit(self._handleresponse,
718 716 handler, resp)
719 717
720 718 def close(self):
721 719 if self._closed:
722 720 return
723 721
724 722 self.sendcommands()
725 723
726 724 self._closed = True
727 725
728 726 if not self._responsef:
729 727 return
730 728
731 729 # TODO ^C here may not result in immediate program termination.
732 730
733 731 try:
734 732 self._responsef.result()
735 733 finally:
736 734 self._responseexecutor.shutdown(wait=True)
737 735 self._responsef = None
738 736 self._responseexecutor = None
739 737
740 738 # If any of our futures are still in progress, mark them as
741 739 # errored, otherwise a result() could wait indefinitely.
742 740 for f in self._futures:
743 741 if not f.done():
744 742 f.set_exception(error.ResponseError(
745 743 _('unfulfilled command response')))
746 744
747 745 self._futures = None
748 746
749 747 def _handleresponse(self, handler, resp):
750 748 # Called in a thread to read the response.
751 749
752 750 while handler.readframe(resp):
753 751 pass
754 752
755 753 # TODO implement interface for version 2 peers
756 754 @interfaceutil.implementer(repository.ipeerconnection,
757 755 repository.ipeercapabilities,
758 756 repository.ipeerrequests)
759 757 class httpv2peer(object):
760 758 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
761 759 apidescriptor):
762 760 self.ui = ui
763 761
764 762 if repourl.endswith('/'):
765 763 repourl = repourl[:-1]
766 764
767 765 self._url = repourl
768 766 self._apipath = apipath
769 767 self._apiurl = '%s/%s' % (repourl, apipath)
770 768 self._opener = opener
771 769 self._requestbuilder = requestbuilder
772 770 self._descriptor = apidescriptor
773 771
774 772 # Start of ipeerconnection.
775 773
776 774 def url(self):
777 775 return self._url
778 776
779 777 def local(self):
780 778 return None
781 779
782 780 def peer(self):
783 781 return self
784 782
785 783 def canpush(self):
786 784 # TODO change once implemented.
787 785 return False
788 786
789 787 def close(self):
790 788 pass
791 789
792 790 # End of ipeerconnection.
793 791
794 792 # Start of ipeercapabilities.
795 793
796 794 def capable(self, name):
797 795 # The capabilities used internally historically map to capabilities
798 796 # advertised from the "capabilities" wire protocol command. However,
799 797 # version 2 of that command works differently.
800 798
801 799 # Maps to commands that are available.
802 800 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
803 801 return True
804 802
805 803 # Other concepts.
806 804 if name in ('bundle2',):
807 805 return True
808 806
809 807 return False
810 808
811 809 def requirecap(self, name, purpose):
812 810 if self.capable(name):
813 811 return
814 812
815 813 raise error.CapabilityError(
816 814 _('cannot %s; client or remote repository does not support the %r '
817 815 'capability') % (purpose, name))
818 816
819 817 # End of ipeercapabilities.
820 818
821 819 def _call(self, name, **args):
822 820 with self.commandexecutor() as e:
823 821 return e.callcommand(name, args).result()
824 822
825 823 def commandexecutor(self):
826 824 return httpv2executor(self.ui, self._opener, self._requestbuilder,
827 825 self._apiurl, self._descriptor)
828 826
829 827 # Registry of API service names to metadata about peers that handle it.
830 828 #
831 829 # The following keys are meaningful:
832 830 #
833 831 # init
834 832 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
835 833 # apidescriptor) to create a peer.
836 834 #
837 835 # priority
838 836 # Integer priority for the service. If we could choose from multiple
839 837 # services, we choose the one with the highest priority.
840 838 API_PEERS = {
841 839 wireprototypes.HTTP_WIREPROTO_V2: {
842 840 'init': httpv2peer,
843 841 'priority': 50,
844 842 },
845 843 }
846 844
847 845 def performhandshake(ui, url, opener, requestbuilder):
848 846 # The handshake is a request to the capabilities command.
849 847
850 848 caps = None
851 849 def capable(x):
852 850 raise error.ProgrammingError('should not be called')
853 851
854 852 args = {}
855 853
856 854 # The client advertises support for newer protocols by adding an
857 855 # X-HgUpgrade-* header with a list of supported APIs and an
858 856 # X-HgProto-* header advertising which serializing formats it supports.
859 857 # We only support the HTTP version 2 transport and CBOR responses for
860 858 # now.
861 859 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
862 860
863 861 if advertisev2:
864 862 args['headers'] = {
865 863 r'X-HgProto-1': r'cbor',
866 864 }
867 865
868 866 args['headers'].update(
869 867 encodevalueinheaders(' '.join(sorted(API_PEERS)),
870 868 'X-HgUpgrade',
871 869 # We don't know the header limit this early.
872 870 # So make it small.
873 871 1024))
874 872
875 873 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
876 874 capable, url, 'capabilities',
877 875 args)
878 876 resp = sendrequest(ui, opener, req)
879 877
880 878 # The server may redirect us to the repo root, stripping the
881 879 # ?cmd=capabilities query string from the URL. The server would likely
882 880 # return HTML in this case and ``parsev1commandresponse()`` would raise.
883 881 # We catch this special case and re-issue the capabilities request against
884 882 # the new URL.
885 883 #
886 884 # We should ideally not do this, as a redirect that drops the query
887 885 # string from the URL is arguably a server bug. (Garbage in, garbage out).
888 886 # However, Mercurial clients for several years appeared to handle this
889 887 # issue without behavior degradation. And according to issue 5860, it may
890 888 # be a longstanding bug in some server implementations. So we allow a
891 889 # redirect that drops the query string to "just work."
892 890 try:
893 891 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
894 892 compressible=False,
895 893 allowcbor=advertisev2)
896 894 except RedirectedRepoError as e:
897 895 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
898 896 capable, e.respurl,
899 897 'capabilities', args)
900 898 resp = sendrequest(ui, opener, req)
901 899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
902 900 compressible=False,
903 901 allowcbor=advertisev2)
904 902
905 903 try:
906 904 rawdata = resp.read()
907 905 finally:
908 906 resp.close()
909 907
910 908 if not ct.startswith('application/mercurial-'):
911 909 raise error.ProgrammingError('unexpected content-type: %s' % ct)
912 910
913 911 if advertisev2:
914 912 if ct == 'application/mercurial-cbor':
915 913 try:
916 info = cbor.loads(rawdata)
917 except cbor.CBORDecodeError:
914 info = cborutil.decodeall(rawdata)[0]
915 except cborutil.CBORDecodeError:
918 916 raise error.Abort(_('error decoding CBOR from remote server'),
919 917 hint=_('try again and consider contacting '
920 918 'the server operator'))
921 919
922 920 # We got a legacy response. That's fine.
923 921 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
924 922 info = {
925 923 'v1capabilities': set(rawdata.split())
926 924 }
927 925
928 926 else:
929 927 raise error.RepoError(
930 928 _('unexpected response type from server: %s') % ct)
931 929 else:
932 930 info = {
933 931 'v1capabilities': set(rawdata.split())
934 932 }
935 933
936 934 return respurl, info
937 935
938 936 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
939 937 """Construct an appropriate HTTP peer instance.
940 938
941 939 ``opener`` is an ``url.opener`` that should be used to establish
942 940 connections, perform HTTP requests.
943 941
944 942 ``requestbuilder`` is the type used for constructing HTTP requests.
945 943 It exists as an argument so extensions can override the default.
946 944 """
947 945 u = util.url(path)
948 946 if u.query or u.fragment:
949 947 raise error.Abort(_('unsupported URL component: "%s"') %
950 948 (u.query or u.fragment))
951 949
952 950 # urllib cannot handle URLs with embedded user or passwd.
953 951 url, authinfo = u.authinfo()
954 952 ui.debug('using %s\n' % url)
955 953
956 954 opener = opener or urlmod.opener(ui, authinfo)
957 955
958 956 respurl, info = performhandshake(ui, url, opener, requestbuilder)
959 957
960 958 # Given the intersection of APIs that both we and the server support,
961 959 # sort by their advertised priority and pick the first one.
962 960 #
963 961 # TODO consider making this request-based and interface driven. For
964 962 # example, the caller could say "I want a peer that does X." It's quite
965 963 # possible that not all peers would do that. Since we know the service
966 964 # capabilities, we could filter out services not meeting the
967 965 # requirements. Possibly by consulting the interfaces defined by the
968 966 # peer type.
969 967 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
970 968
971 969 preferredchoices = sorted(apipeerchoices,
972 970 key=lambda x: API_PEERS[x]['priority'],
973 971 reverse=True)
974 972
975 973 for service in preferredchoices:
976 974 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
977 975
978 976 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
979 977 requestbuilder,
980 978 info['apis'][service])
981 979
982 980 # Failed to construct an API peer. Fall back to legacy.
983 981 return httppeer(ui, path, respurl, opener, requestbuilder,
984 982 info['v1capabilities'])
985 983
986 984 def instance(ui, path, create, intents=None):
987 985 if create:
988 986 raise error.Abort(_('cannot create new http repository'))
989 987 try:
990 988 if path.startswith('https:') and not urlmod.has_https:
991 989 raise error.Abort(_('Python support for SSL and HTTPS '
992 990 'is not installed'))
993 991
994 992 inst = makepeer(ui, path)
995 993
996 994 return inst
997 995 except error.RepoError as httpexception:
998 996 try:
999 997 r = statichttprepo.instance(ui, "static-" + path, create)
1000 998 ui.note(_('(falling back to static-http)\n'))
1001 999 return r
1002 1000 except error.RepoError:
1003 1001 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now