##// END OF EJS Templates
httppeer: expose capabilities for each command...
Gregory Szorc -
r39664:a5de21c9 default
parent child Browse files
Show More
@@ -1,1002 +1,1006 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 repository,
25 25 statichttprepo,
26 26 url as urlmod,
27 27 util,
28 28 wireprotoframing,
29 29 wireprototypes,
30 30 wireprotov1peer,
31 31 wireprotov2peer,
32 32 wireprotov2server,
33 33 )
34 34 from .utils import (
35 35 cborutil,
36 36 interfaceutil,
37 37 stringutil,
38 38 )
39 39
40 40 httplib = util.httplib
41 41 urlerr = util.urlerr
42 42 urlreq = util.urlreq
43 43
44 44 def encodevalueinheaders(value, header, limit):
45 45 """Encode a string value into multiple HTTP headers.
46 46
47 47 ``value`` will be encoded into 1 or more HTTP headers with the names
48 48 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
49 49 name + value will be at most ``limit`` bytes long.
50 50
51 51 Returns an iterable of 2-tuples consisting of header names and
52 52 values as native strings.
53 53 """
54 54 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
55 55 # not bytes. This function always takes bytes in as arguments.
56 56 fmt = pycompat.strurl(header) + r'-%s'
57 57 # Note: it is *NOT* a bug that the last bit here is a bytestring
58 58 # and not a unicode: we're just getting the encoded length anyway,
59 59 # and using an r-string to make it portable between Python 2 and 3
60 60 # doesn't work because then the \r is a literal backslash-r
61 61 # instead of a carriage return.
62 62 valuelen = limit - len(fmt % r'000') - len(': \r\n')
63 63 result = []
64 64
65 65 n = 0
66 66 for i in pycompat.xrange(0, len(value), valuelen):
67 67 n += 1
68 68 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
69 69
70 70 return result
71 71
72 72 def _wraphttpresponse(resp):
73 73 """Wrap an HTTPResponse with common error handlers.
74 74
75 75 This ensures that any I/O from any consumer raises the appropriate
76 76 error and messaging.
77 77 """
78 78 origread = resp.read
79 79
80 80 class readerproxy(resp.__class__):
81 81 def read(self, size=None):
82 82 try:
83 83 return origread(size)
84 84 except httplib.IncompleteRead as e:
85 85 # e.expected is an integer if length known or None otherwise.
86 86 if e.expected:
87 87 got = len(e.partial)
88 88 total = e.expected + got
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (total, got)
91 91 else:
92 92 msg = _('HTTP request error (incomplete response)')
93 93
94 94 raise error.PeerTransportError(
95 95 msg,
96 96 hint=_('this may be an intermittent network failure; '
97 97 'if the error persists, consider contacting the '
98 98 'network or server operator'))
99 99 except httplib.HTTPException as e:
100 100 raise error.PeerTransportError(
101 101 _('HTTP request error (%s)') % e,
102 102 hint=_('this may be an intermittent network failure; '
103 103 'if the error persists, consider contacting the '
104 104 'network or server operator'))
105 105
106 106 resp.__class__ = readerproxy
107 107
108 108 class _multifile(object):
109 109 def __init__(self, *fileobjs):
110 110 for f in fileobjs:
111 111 if not util.safehasattr(f, 'length'):
112 112 raise ValueError(
113 113 '_multifile only supports file objects that '
114 114 'have a length but this one does not:', type(f), f)
115 115 self._fileobjs = fileobjs
116 116 self._index = 0
117 117
118 118 @property
119 119 def length(self):
120 120 return sum(f.length for f in self._fileobjs)
121 121
122 122 def read(self, amt=None):
123 123 if amt <= 0:
124 124 return ''.join(f.read() for f in self._fileobjs)
125 125 parts = []
126 126 while amt and self._index < len(self._fileobjs):
127 127 parts.append(self._fileobjs[self._index].read(amt))
128 128 got = len(parts[-1])
129 129 if got < amt:
130 130 self._index += 1
131 131 amt -= got
132 132 return ''.join(parts)
133 133
134 134 def seek(self, offset, whence=os.SEEK_SET):
135 135 if whence != os.SEEK_SET:
136 136 raise NotImplementedError(
137 137 '_multifile does not support anything other'
138 138 ' than os.SEEK_SET for whence on seek()')
139 139 if offset != 0:
140 140 raise NotImplementedError(
141 141 '_multifile only supports seeking to start, but that '
142 142 'could be fixed if you need it')
143 143 for f in self._fileobjs:
144 144 f.seek(0)
145 145 self._index = 0
146 146
147 147 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 148 repobaseurl, cmd, args):
149 149 """Make an HTTP request to run a command for a version 1 client.
150 150
151 151 ``caps`` is a set of known server capabilities. The value may be
152 152 None if capabilities are not yet known.
153 153
154 154 ``capablefn`` is a function to evaluate a capability.
155 155
156 156 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 157 raw data to pass to it.
158 158 """
159 159 if cmd == 'pushkey':
160 160 args['data'] = ''
161 161 data = args.pop('data', None)
162 162 headers = args.pop('headers', {})
163 163
164 164 ui.debug("sending %s command\n" % cmd)
165 165 q = [('cmd', cmd)]
166 166 headersize = 0
167 167 # Important: don't use self.capable() here or else you end up
168 168 # with infinite recursion when trying to look up capabilities
169 169 # for the first time.
170 170 postargsok = caps is not None and 'httppostargs' in caps
171 171
172 172 # Send arguments via POST.
173 173 if postargsok and args:
174 174 strargs = urlreq.urlencode(sorted(args.items()))
175 175 if not data:
176 176 data = strargs
177 177 else:
178 178 if isinstance(data, bytes):
179 179 i = io.BytesIO(data)
180 180 i.length = len(data)
181 181 data = i
182 182 argsio = io.BytesIO(strargs)
183 183 argsio.length = len(strargs)
184 184 data = _multifile(argsio, data)
185 185 headers[r'X-HgArgs-Post'] = len(strargs)
186 186 elif args:
187 187 # Calling self.capable() can infinite loop if we are calling
188 188 # "capabilities". But that command should never accept wire
189 189 # protocol arguments. So this should never happen.
190 190 assert cmd != 'capabilities'
191 191 httpheader = capablefn('httpheader')
192 192 if httpheader:
193 193 headersize = int(httpheader.split(',', 1)[0])
194 194
195 195 # Send arguments via HTTP headers.
196 196 if headersize > 0:
197 197 # The headers can typically carry more data than the URL.
198 198 encargs = urlreq.urlencode(sorted(args.items()))
199 199 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 200 headersize):
201 201 headers[header] = value
202 202 # Send arguments via query string (Mercurial <1.9).
203 203 else:
204 204 q += sorted(args.items())
205 205
206 206 qs = '?%s' % urlreq.urlencode(q)
207 207 cu = "%s%s" % (repobaseurl, qs)
208 208 size = 0
209 209 if util.safehasattr(data, 'length'):
210 210 size = data.length
211 211 elif data is not None:
212 212 size = len(data)
213 213 if data is not None and r'Content-Type' not in headers:
214 214 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 215
216 216 # Tell the server we accept application/mercurial-0.2 and multiple
217 217 # compression formats if the server is capable of emitting those
218 218 # payloads.
219 219 # Note: Keep this set empty by default, as client advertisement of
220 220 # protocol parameters should only occur after the handshake.
221 221 protoparams = set()
222 222
223 223 mediatypes = set()
224 224 if caps is not None:
225 225 mt = capablefn('httpmediatype')
226 226 if mt:
227 227 protoparams.add('0.1')
228 228 mediatypes = set(mt.split(','))
229 229
230 230 protoparams.add('partial-pull')
231 231
232 232 if '0.2tx' in mediatypes:
233 233 protoparams.add('0.2')
234 234
235 235 if '0.2tx' in mediatypes and capablefn('compression'):
236 236 # We /could/ compare supported compression formats and prune
237 237 # non-mutually supported or error if nothing is mutually supported.
238 238 # For now, send the full list to the server and have it error.
239 239 comps = [e.wireprotosupport().name for e in
240 240 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 241 protoparams.add('comp=%s' % ','.join(comps))
242 242
243 243 if protoparams:
244 244 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 245 'X-HgProto',
246 246 headersize or 1024)
247 247 for header, value in protoheaders:
248 248 headers[header] = value
249 249
250 250 varyheaders = []
251 251 for header in headers:
252 252 if header.lower().startswith(r'x-hg'):
253 253 varyheaders.append(header)
254 254
255 255 if varyheaders:
256 256 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 257
258 258 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 259
260 260 if data is not None:
261 261 ui.debug("sending %d bytes\n" % size)
262 262 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 263
264 264 return req, cu, qs
265 265
266 266 def _reqdata(req):
267 267 """Get request data, if any. If no data, returns None."""
268 268 if pycompat.ispy3:
269 269 return req.data
270 270 if not req.has_data():
271 271 return None
272 272 return req.get_data()
273 273
274 274 def sendrequest(ui, opener, req):
275 275 """Send a prepared HTTP request.
276 276
277 277 Returns the response object.
278 278 """
279 279 dbg = ui.debug
280 280 if (ui.debugflag
281 281 and ui.configbool('devel', 'debug.peer-request')):
282 282 line = 'devel-peer-request: %s\n'
283 283 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
284 284 pycompat.bytesurl(req.get_full_url())))
285 285 hgargssize = None
286 286
287 287 for header, value in sorted(req.header_items()):
288 288 header = pycompat.bytesurl(header)
289 289 value = pycompat.bytesurl(value)
290 290 if header.startswith('X-hgarg-'):
291 291 if hgargssize is None:
292 292 hgargssize = 0
293 293 hgargssize += len(value)
294 294 else:
295 295 dbg(line % ' %s %s' % (header, value))
296 296
297 297 if hgargssize is not None:
298 298 dbg(line % ' %d bytes of commands arguments in headers'
299 299 % hgargssize)
300 300 data = _reqdata(req)
301 301 if data is not None:
302 302 length = getattr(data, 'length', None)
303 303 if length is None:
304 304 length = len(data)
305 305 dbg(line % ' %d bytes of data' % length)
306 306
307 307 start = util.timer()
308 308
309 309 res = None
310 310 try:
311 311 res = opener.open(req)
312 312 except urlerr.httperror as inst:
313 313 if inst.code == 401:
314 314 raise error.Abort(_('authorization failed'))
315 315 raise
316 316 except httplib.HTTPException as inst:
317 317 ui.debug('http error requesting %s\n' %
318 318 util.hidepassword(req.get_full_url()))
319 319 ui.traceback()
320 320 raise IOError(None, inst)
321 321 finally:
322 322 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
323 323 code = res.code if res else -1
324 324 dbg(line % ' finished in %.4f seconds (%d)'
325 325 % (util.timer() - start, code))
326 326
327 327 # Insert error handlers for common I/O failures.
328 328 _wraphttpresponse(res)
329 329
330 330 return res
331 331
332 332 class RedirectedRepoError(error.RepoError):
333 333 def __init__(self, msg, respurl):
334 334 super(RedirectedRepoError, self).__init__(msg)
335 335 self.respurl = respurl
336 336
337 337 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
338 338 allowcbor=False):
339 339 # record the url we got redirected to
340 340 redirected = False
341 341 respurl = pycompat.bytesurl(resp.geturl())
342 342 if respurl.endswith(qs):
343 343 respurl = respurl[:-len(qs)]
344 344 qsdropped = False
345 345 else:
346 346 qsdropped = True
347 347
348 348 if baseurl.rstrip('/') != respurl.rstrip('/'):
349 349 redirected = True
350 350 if not ui.quiet:
351 351 ui.warn(_('real URL is %s\n') % respurl)
352 352
353 353 try:
354 354 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
355 355 except AttributeError:
356 356 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
357 357
358 358 safeurl = util.hidepassword(baseurl)
359 359 if proto.startswith('application/hg-error'):
360 360 raise error.OutOfBandError(resp.read())
361 361
362 362 # Pre 1.0 versions of Mercurial used text/plain and
363 363 # application/hg-changegroup. We don't support such old servers.
364 364 if not proto.startswith('application/mercurial-'):
365 365 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
366 366 msg = _("'%s' does not appear to be an hg repository:\n"
367 367 "---%%<--- (%s)\n%s\n---%%<---\n") % (
368 368 safeurl, proto or 'no content-type', resp.read(1024))
369 369
370 370 # Some servers may strip the query string from the redirect. We
371 371 # raise a special error type so callers can react to this specially.
372 372 if redirected and qsdropped:
373 373 raise RedirectedRepoError(msg, respurl)
374 374 else:
375 375 raise error.RepoError(msg)
376 376
377 377 try:
378 378 subtype = proto.split('-', 1)[1]
379 379
380 380 # Unless we end up supporting CBOR in the legacy wire protocol,
381 381 # this should ONLY be encountered for the initial capabilities
382 382 # request during handshake.
383 383 if subtype == 'cbor':
384 384 if allowcbor:
385 385 return respurl, proto, resp
386 386 else:
387 387 raise error.RepoError(_('unexpected CBOR response from '
388 388 'server'))
389 389
390 390 version_info = tuple([int(n) for n in subtype.split('.')])
391 391 except ValueError:
392 392 raise error.RepoError(_("'%s' sent a broken Content-Type "
393 393 "header (%s)") % (safeurl, proto))
394 394
395 395 # TODO consider switching to a decompression reader that uses
396 396 # generators.
397 397 if version_info == (0, 1):
398 398 if compressible:
399 399 resp = util.compengines['zlib'].decompressorreader(resp)
400 400
401 401 elif version_info == (0, 2):
402 402 # application/mercurial-0.2 always identifies the compression
403 403 # engine in the payload header.
404 404 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
405 405 ename = util.readexactly(resp, elen)
406 406 engine = util.compengines.forwiretype(ename)
407 407
408 408 resp = engine.decompressorreader(resp)
409 409 else:
410 410 raise error.RepoError(_("'%s' uses newer protocol %s") %
411 411 (safeurl, subtype))
412 412
413 413 return respurl, proto, resp
414 414
415 415 class httppeer(wireprotov1peer.wirepeer):
416 416 def __init__(self, ui, path, url, opener, requestbuilder, caps):
417 417 self.ui = ui
418 418 self._path = path
419 419 self._url = url
420 420 self._caps = caps
421 421 self._urlopener = opener
422 422 self._requestbuilder = requestbuilder
423 423
424 424 def __del__(self):
425 425 for h in self._urlopener.handlers:
426 426 h.close()
427 427 getattr(h, "close_all", lambda: None)()
428 428
429 429 # Begin of ipeerconnection interface.
430 430
431 431 def url(self):
432 432 return self._path
433 433
434 434 def local(self):
435 435 return None
436 436
437 437 def peer(self):
438 438 return self
439 439
440 440 def canpush(self):
441 441 return True
442 442
443 443 def close(self):
444 444 pass
445 445
446 446 # End of ipeerconnection interface.
447 447
448 448 # Begin of ipeercommands interface.
449 449
450 450 def capabilities(self):
451 451 return self._caps
452 452
453 453 # End of ipeercommands interface.
454 454
455 455 def _callstream(self, cmd, _compressible=False, **args):
456 456 args = pycompat.byteskwargs(args)
457 457
458 458 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
459 459 self._caps, self.capable,
460 460 self._url, cmd, args)
461 461
462 462 resp = sendrequest(self.ui, self._urlopener, req)
463 463
464 464 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
465 465 resp, _compressible)
466 466
467 467 return resp
468 468
469 469 def _call(self, cmd, **args):
470 470 fp = self._callstream(cmd, **args)
471 471 try:
472 472 return fp.read()
473 473 finally:
474 474 # if using keepalive, allow connection to be reused
475 475 fp.close()
476 476
477 477 def _callpush(self, cmd, cg, **args):
478 478 # have to stream bundle to a temp file because we do not have
479 479 # http 1.1 chunked transfer.
480 480
481 481 types = self.capable('unbundle')
482 482 try:
483 483 types = types.split(',')
484 484 except AttributeError:
485 485 # servers older than d1b16a746db6 will send 'unbundle' as a
486 486 # boolean capability. They only support headerless/uncompressed
487 487 # bundles.
488 488 types = [""]
489 489 for x in types:
490 490 if x in bundle2.bundletypes:
491 491 type = x
492 492 break
493 493
494 494 tempname = bundle2.writebundle(self.ui, cg, None, type)
495 495 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
496 496 headers = {r'Content-Type': r'application/mercurial-0.1'}
497 497
498 498 try:
499 499 r = self._call(cmd, data=fp, headers=headers, **args)
500 500 vals = r.split('\n', 1)
501 501 if len(vals) < 2:
502 502 raise error.ResponseError(_("unexpected response:"), r)
503 503 return vals
504 504 except urlerr.httperror:
505 505 # Catch and re-raise these so we don't try and treat them
506 506 # like generic socket errors. They lack any values in
507 507 # .args on Python 3 which breaks our socket.error block.
508 508 raise
509 509 except socket.error as err:
510 510 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
511 511 raise error.Abort(_('push failed: %s') % err.args[1])
512 512 raise error.Abort(err.args[1])
513 513 finally:
514 514 fp.close()
515 515 os.unlink(tempname)
516 516
517 517 def _calltwowaystream(self, cmd, fp, **args):
518 518 fh = None
519 519 fp_ = None
520 520 filename = None
521 521 try:
522 522 # dump bundle to disk
523 523 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
524 524 fh = os.fdopen(fd, r"wb")
525 525 d = fp.read(4096)
526 526 while d:
527 527 fh.write(d)
528 528 d = fp.read(4096)
529 529 fh.close()
530 530 # start http push
531 531 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
532 532 headers = {r'Content-Type': r'application/mercurial-0.1'}
533 533 return self._callstream(cmd, data=fp_, headers=headers, **args)
534 534 finally:
535 535 if fp_ is not None:
536 536 fp_.close()
537 537 if fh is not None:
538 538 fh.close()
539 539 os.unlink(filename)
540 540
541 541 def _callcompressable(self, cmd, **args):
542 542 return self._callstream(cmd, _compressible=True, **args)
543 543
544 544 def _abort(self, exception):
545 545 raise exception
546 546
547 547 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
548 548 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
549 549 buffersends=True)
550 550
551 551 handler = wireprotov2peer.clienthandler(ui, reactor)
552 552
553 553 url = '%s/%s' % (apiurl, permission)
554 554
555 555 if len(requests) > 1:
556 556 url += '/multirequest'
557 557 else:
558 558 url += '/%s' % requests[0][0]
559 559
560 560 ui.debug('sending %d commands\n' % len(requests))
561 561 for command, args, f in requests:
562 562 ui.debug('sending command %s: %s\n' % (
563 563 command, stringutil.pprint(args, indent=2)))
564 564 assert not list(handler.callcommand(command, args, f))
565 565
566 566 # TODO stream this.
567 567 body = b''.join(map(bytes, handler.flushcommands()))
568 568
569 569 # TODO modify user-agent to reflect v2
570 570 headers = {
571 571 r'Accept': wireprotov2server.FRAMINGTYPE,
572 572 r'Content-Type': wireprotov2server.FRAMINGTYPE,
573 573 }
574 574
575 575 req = requestbuilder(pycompat.strurl(url), body, headers)
576 576 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
577 577
578 578 try:
579 579 res = opener.open(req)
580 580 except urlerr.httperror as e:
581 581 if e.code == 401:
582 582 raise error.Abort(_('authorization failed'))
583 583
584 584 raise
585 585 except httplib.HTTPException as e:
586 586 ui.traceback()
587 587 raise IOError(None, e)
588 588
589 589 return handler, res
590 590
591 591 class queuedcommandfuture(pycompat.futures.Future):
592 592 """Wraps result() on command futures to trigger submission on call."""
593 593
594 594 def result(self, timeout=None):
595 595 if self.done():
596 596 return pycompat.futures.Future.result(self, timeout)
597 597
598 598 self._peerexecutor.sendcommands()
599 599
600 600 # sendcommands() will restore the original __class__ and self.result
601 601 # will resolve to Future.result.
602 602 return self.result(timeout)
603 603
604 604 @interfaceutil.implementer(repository.ipeercommandexecutor)
605 605 class httpv2executor(object):
606 606 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
607 607 self._ui = ui
608 608 self._opener = opener
609 609 self._requestbuilder = requestbuilder
610 610 self._apiurl = apiurl
611 611 self._descriptor = descriptor
612 612 self._sent = False
613 613 self._closed = False
614 614 self._neededpermissions = set()
615 615 self._calls = []
616 616 self._futures = weakref.WeakSet()
617 617 self._responseexecutor = None
618 618 self._responsef = None
619 619
620 620 def __enter__(self):
621 621 return self
622 622
623 623 def __exit__(self, exctype, excvalue, exctb):
624 624 self.close()
625 625
626 626 def callcommand(self, command, args):
627 627 if self._sent:
628 628 raise error.ProgrammingError('callcommand() cannot be used after '
629 629 'commands are sent')
630 630
631 631 if self._closed:
632 632 raise error.ProgrammingError('callcommand() cannot be used after '
633 633 'close()')
634 634
635 635 # The service advertises which commands are available. So if we attempt
636 636 # to call an unknown command or pass an unknown argument, we can screen
637 637 # for this.
638 638 if command not in self._descriptor['commands']:
639 639 raise error.ProgrammingError(
640 640 'wire protocol command %s is not available' % command)
641 641
642 642 cmdinfo = self._descriptor['commands'][command]
643 643 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
644 644
645 645 if unknownargs:
646 646 raise error.ProgrammingError(
647 647 'wire protocol command %s does not accept argument: %s' % (
648 648 command, ', '.join(sorted(unknownargs))))
649 649
650 650 self._neededpermissions |= set(cmdinfo['permissions'])
651 651
652 652 # TODO we /could/ also validate types here, since the API descriptor
653 653 # includes types...
654 654
655 655 f = pycompat.futures.Future()
656 656
657 657 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
658 658 # could deadlock.
659 659 f.__class__ = queuedcommandfuture
660 660 f._peerexecutor = self
661 661
662 662 self._futures.add(f)
663 663 self._calls.append((command, args, f))
664 664
665 665 return f
666 666
667 667 def sendcommands(self):
668 668 if self._sent:
669 669 return
670 670
671 671 if not self._calls:
672 672 return
673 673
674 674 self._sent = True
675 675
676 676 # Unhack any future types so caller sees a clean type and so we
677 677 # break reference cycle.
678 678 for f in self._futures:
679 679 if isinstance(f, queuedcommandfuture):
680 680 f.__class__ = pycompat.futures.Future
681 681 f._peerexecutor = None
682 682
683 683 # Mark the future as running and filter out cancelled futures.
684 684 calls = [(command, args, f)
685 685 for command, args, f in self._calls
686 686 if f.set_running_or_notify_cancel()]
687 687
688 688 # Clear out references, prevent improper object usage.
689 689 self._calls = None
690 690
691 691 if not calls:
692 692 return
693 693
694 694 permissions = set(self._neededpermissions)
695 695
696 696 if 'push' in permissions and 'pull' in permissions:
697 697 permissions.remove('pull')
698 698
699 699 if len(permissions) > 1:
700 700 raise error.RepoError(_('cannot make request requiring multiple '
701 701 'permissions: %s') %
702 702 _(', ').join(sorted(permissions)))
703 703
704 704 permission = {
705 705 'push': 'rw',
706 706 'pull': 'ro',
707 707 }[permissions.pop()]
708 708
709 709 handler, resp = sendv2request(
710 710 self._ui, self._opener, self._requestbuilder, self._apiurl,
711 711 permission, calls)
712 712
713 713 # TODO we probably want to validate the HTTP code, media type, etc.
714 714
715 715 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
716 716 self._responsef = self._responseexecutor.submit(self._handleresponse,
717 717 handler, resp)
718 718
719 719 def close(self):
720 720 if self._closed:
721 721 return
722 722
723 723 self.sendcommands()
724 724
725 725 self._closed = True
726 726
727 727 if not self._responsef:
728 728 return
729 729
730 730 # TODO ^C here may not result in immediate program termination.
731 731
732 732 try:
733 733 self._responsef.result()
734 734 finally:
735 735 self._responseexecutor.shutdown(wait=True)
736 736 self._responsef = None
737 737 self._responseexecutor = None
738 738
739 739 # If any of our futures are still in progress, mark them as
740 740 # errored, otherwise a result() could wait indefinitely.
741 741 for f in self._futures:
742 742 if not f.done():
743 743 f.set_exception(error.ResponseError(
744 744 _('unfulfilled command response')))
745 745
746 746 self._futures = None
747 747
748 748 def _handleresponse(self, handler, resp):
749 749 # Called in a thread to read the response.
750 750
751 751 while handler.readframe(resp):
752 752 pass
753 753
754 754 # TODO implement interface for version 2 peers
755 755 @interfaceutil.implementer(repository.ipeerconnection,
756 756 repository.ipeercapabilities,
757 757 repository.ipeerrequests)
758 758 class httpv2peer(object):
759 759 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
760 760 apidescriptor):
761 761 self.ui = ui
762 762
763 763 if repourl.endswith('/'):
764 764 repourl = repourl[:-1]
765 765
766 766 self._url = repourl
767 767 self._apipath = apipath
768 768 self._apiurl = '%s/%s' % (repourl, apipath)
769 769 self._opener = opener
770 770 self._requestbuilder = requestbuilder
771 771 self._descriptor = apidescriptor
772 772
773 773 # Start of ipeerconnection.
774 774
775 775 def url(self):
776 776 return self._url
777 777
778 778 def local(self):
779 779 return None
780 780
781 781 def peer(self):
782 782 return self
783 783
784 784 def canpush(self):
785 785 # TODO change once implemented.
786 786 return False
787 787
788 788 def close(self):
789 789 pass
790 790
791 791 # End of ipeerconnection.
792 792
793 793 # Start of ipeercapabilities.
794 794
795 795 def capable(self, name):
796 796 # The capabilities used internally historically map to capabilities
797 797 # advertised from the "capabilities" wire protocol command. However,
798 798 # version 2 of that command works differently.
799 799
800 800 # Maps to commands that are available.
801 801 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
802 802 return True
803 803
804 804 # Other concepts.
805 805 if name in ('bundle2',):
806 806 return True
807 807
808 # Alias command-* to presence of command of that name.
809 if name.startswith('command-'):
810 return name[len('command-'):] in self._descriptor['commands']
811
808 812 return False
809 813
810 814 def requirecap(self, name, purpose):
811 815 if self.capable(name):
812 816 return
813 817
814 818 raise error.CapabilityError(
815 819 _('cannot %s; client or remote repository does not support the %r '
816 820 'capability') % (purpose, name))
817 821
818 822 # End of ipeercapabilities.
819 823
820 824 def _call(self, name, **args):
821 825 with self.commandexecutor() as e:
822 826 return e.callcommand(name, args).result()
823 827
824 828 def commandexecutor(self):
825 829 return httpv2executor(self.ui, self._opener, self._requestbuilder,
826 830 self._apiurl, self._descriptor)
827 831
828 832 # Registry of API service names to metadata about peers that handle it.
829 833 #
830 834 # The following keys are meaningful:
831 835 #
832 836 # init
833 837 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
834 838 # apidescriptor) to create a peer.
835 839 #
836 840 # priority
837 841 # Integer priority for the service. If we could choose from multiple
838 842 # services, we choose the one with the highest priority.
839 843 API_PEERS = {
840 844 wireprototypes.HTTP_WIREPROTO_V2: {
841 845 'init': httpv2peer,
842 846 'priority': 50,
843 847 },
844 848 }
845 849
846 850 def performhandshake(ui, url, opener, requestbuilder):
847 851 # The handshake is a request to the capabilities command.
848 852
849 853 caps = None
850 854 def capable(x):
851 855 raise error.ProgrammingError('should not be called')
852 856
853 857 args = {}
854 858
855 859 # The client advertises support for newer protocols by adding an
856 860 # X-HgUpgrade-* header with a list of supported APIs and an
857 861 # X-HgProto-* header advertising which serializing formats it supports.
858 862 # We only support the HTTP version 2 transport and CBOR responses for
859 863 # now.
860 864 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
861 865
862 866 if advertisev2:
863 867 args['headers'] = {
864 868 r'X-HgProto-1': r'cbor',
865 869 }
866 870
867 871 args['headers'].update(
868 872 encodevalueinheaders(' '.join(sorted(API_PEERS)),
869 873 'X-HgUpgrade',
870 874 # We don't know the header limit this early.
871 875 # So make it small.
872 876 1024))
873 877
874 878 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
875 879 capable, url, 'capabilities',
876 880 args)
877 881 resp = sendrequest(ui, opener, req)
878 882
879 883 # The server may redirect us to the repo root, stripping the
880 884 # ?cmd=capabilities query string from the URL. The server would likely
881 885 # return HTML in this case and ``parsev1commandresponse()`` would raise.
882 886 # We catch this special case and re-issue the capabilities request against
883 887 # the new URL.
884 888 #
885 889 # We should ideally not do this, as a redirect that drops the query
886 890 # string from the URL is arguably a server bug. (Garbage in, garbage out).
887 891 # However, Mercurial clients for several years appeared to handle this
888 892 # issue without behavior degradation. And according to issue 5860, it may
889 893 # be a longstanding bug in some server implementations. So we allow a
890 894 # redirect that drops the query string to "just work."
891 895 try:
892 896 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
893 897 compressible=False,
894 898 allowcbor=advertisev2)
895 899 except RedirectedRepoError as e:
896 900 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
897 901 capable, e.respurl,
898 902 'capabilities', args)
899 903 resp = sendrequest(ui, opener, req)
900 904 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
901 905 compressible=False,
902 906 allowcbor=advertisev2)
903 907
904 908 try:
905 909 rawdata = resp.read()
906 910 finally:
907 911 resp.close()
908 912
909 913 if not ct.startswith('application/mercurial-'):
910 914 raise error.ProgrammingError('unexpected content-type: %s' % ct)
911 915
912 916 if advertisev2:
913 917 if ct == 'application/mercurial-cbor':
914 918 try:
915 919 info = cborutil.decodeall(rawdata)[0]
916 920 except cborutil.CBORDecodeError:
917 921 raise error.Abort(_('error decoding CBOR from remote server'),
918 922 hint=_('try again and consider contacting '
919 923 'the server operator'))
920 924
921 925 # We got a legacy response. That's fine.
922 926 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
923 927 info = {
924 928 'v1capabilities': set(rawdata.split())
925 929 }
926 930
927 931 else:
928 932 raise error.RepoError(
929 933 _('unexpected response type from server: %s') % ct)
930 934 else:
931 935 info = {
932 936 'v1capabilities': set(rawdata.split())
933 937 }
934 938
935 939 return respurl, info
936 940
937 941 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
938 942 """Construct an appropriate HTTP peer instance.
939 943
940 944 ``opener`` is an ``url.opener`` that should be used to establish
941 945 connections, perform HTTP requests.
942 946
943 947 ``requestbuilder`` is the type used for constructing HTTP requests.
944 948 It exists as an argument so extensions can override the default.
945 949 """
946 950 u = util.url(path)
947 951 if u.query or u.fragment:
948 952 raise error.Abort(_('unsupported URL component: "%s"') %
949 953 (u.query or u.fragment))
950 954
951 955 # urllib cannot handle URLs with embedded user or passwd.
952 956 url, authinfo = u.authinfo()
953 957 ui.debug('using %s\n' % url)
954 958
955 959 opener = opener or urlmod.opener(ui, authinfo)
956 960
957 961 respurl, info = performhandshake(ui, url, opener, requestbuilder)
958 962
959 963 # Given the intersection of APIs that both we and the server support,
960 964 # sort by their advertised priority and pick the first one.
961 965 #
962 966 # TODO consider making this request-based and interface driven. For
963 967 # example, the caller could say "I want a peer that does X." It's quite
964 968 # possible that not all peers would do that. Since we know the service
965 969 # capabilities, we could filter out services not meeting the
966 970 # requirements. Possibly by consulting the interfaces defined by the
967 971 # peer type.
968 972 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
969 973
970 974 preferredchoices = sorted(apipeerchoices,
971 975 key=lambda x: API_PEERS[x]['priority'],
972 976 reverse=True)
973 977
974 978 for service in preferredchoices:
975 979 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
976 980
977 981 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
978 982 requestbuilder,
979 983 info['apis'][service])
980 984
981 985 # Failed to construct an API peer. Fall back to legacy.
982 986 return httppeer(ui, path, respurl, opener, requestbuilder,
983 987 info['v1capabilities'])
984 988
985 989 def instance(ui, path, create, intents=None, createopts=None):
986 990 if create:
987 991 raise error.Abort(_('cannot create new http repository'))
988 992 try:
989 993 if path.startswith('https:') and not urlmod.has_https:
990 994 raise error.Abort(_('Python support for SSL and HTTPS '
991 995 'is not installed'))
992 996
993 997 inst = makepeer(ui, path)
994 998
995 999 return inst
996 1000 except error.RepoError as httpexception:
997 1001 try:
998 1002 r = statichttprepo.instance(ui, "static-" + path, create)
999 1003 ui.note(_('(falling back to static-http)\n'))
1000 1004 return r
1001 1005 except error.RepoError:
1002 1006 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now