##// END OF EJS Templates
httppeer: log commands for version 2 peer...
Gregory Szorc -
r39467:3fe028b6 default
parent child Browse files
Show More
@@ -1,997 +1,1001 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from .thirdparty import (
20 20 cbor,
21 21 )
22 22 from . import (
23 23 bundle2,
24 24 error,
25 25 httpconnection,
26 26 pycompat,
27 27 repository,
28 28 statichttprepo,
29 29 url as urlmod,
30 30 util,
31 31 wireprotoframing,
32 32 wireprototypes,
33 33 wireprotov1peer,
34 34 wireprotov2peer,
35 35 wireprotov2server,
36 36 )
37 37 from .utils import (
38 38 interfaceutil,
39 stringutil,
39 40 )
40 41
41 42 httplib = util.httplib
42 43 urlerr = util.urlerr
43 44 urlreq = util.urlreq
44 45
45 46 def encodevalueinheaders(value, header, limit):
46 47 """Encode a string value into multiple HTTP headers.
47 48
48 49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 51 name + value will be at most ``limit`` bytes long.
51 52
52 53 Returns an iterable of 2-tuples consisting of header names and
53 54 values as native strings.
54 55 """
55 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 57 # not bytes. This function always takes bytes in as arguments.
57 58 fmt = pycompat.strurl(header) + r'-%s'
58 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 60 # and not a unicode: we're just getting the encoded length anyway,
60 61 # and using an r-string to make it portable between Python 2 and 3
61 62 # doesn't work because then the \r is a literal backslash-r
62 63 # instead of a carriage return.
63 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 65 result = []
65 66
66 67 n = 0
67 68 for i in pycompat.xrange(0, len(value), valuelen):
68 69 n += 1
69 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 71
71 72 return result
72 73
73 74 def _wraphttpresponse(resp):
74 75 """Wrap an HTTPResponse with common error handlers.
75 76
76 77 This ensures that any I/O from any consumer raises the appropriate
77 78 error and messaging.
78 79 """
79 80 origread = resp.read
80 81
81 82 class readerproxy(resp.__class__):
82 83 def read(self, size=None):
83 84 try:
84 85 return origread(size)
85 86 except httplib.IncompleteRead as e:
86 87 # e.expected is an integer if length known or None otherwise.
87 88 if e.expected:
88 89 msg = _('HTTP request error (incomplete response; '
89 90 'expected %d bytes got %d)') % (e.expected,
90 91 len(e.partial))
91 92 else:
92 93 msg = _('HTTP request error (incomplete response)')
93 94
94 95 raise error.PeerTransportError(
95 96 msg,
96 97 hint=_('this may be an intermittent network failure; '
97 98 'if the error persists, consider contacting the '
98 99 'network or server operator'))
99 100 except httplib.HTTPException as e:
100 101 raise error.PeerTransportError(
101 102 _('HTTP request error (%s)') % e,
102 103 hint=_('this may be an intermittent network failure; '
103 104 'if the error persists, consider contacting the '
104 105 'network or server operator'))
105 106
106 107 resp.__class__ = readerproxy
107 108
108 109 class _multifile(object):
109 110 def __init__(self, *fileobjs):
110 111 for f in fileobjs:
111 112 if not util.safehasattr(f, 'length'):
112 113 raise ValueError(
113 114 '_multifile only supports file objects that '
114 115 'have a length but this one does not:', type(f), f)
115 116 self._fileobjs = fileobjs
116 117 self._index = 0
117 118
118 119 @property
119 120 def length(self):
120 121 return sum(f.length for f in self._fileobjs)
121 122
122 123 def read(self, amt=None):
123 124 if amt <= 0:
124 125 return ''.join(f.read() for f in self._fileobjs)
125 126 parts = []
126 127 while amt and self._index < len(self._fileobjs):
127 128 parts.append(self._fileobjs[self._index].read(amt))
128 129 got = len(parts[-1])
129 130 if got < amt:
130 131 self._index += 1
131 132 amt -= got
132 133 return ''.join(parts)
133 134
134 135 def seek(self, offset, whence=os.SEEK_SET):
135 136 if whence != os.SEEK_SET:
136 137 raise NotImplementedError(
137 138 '_multifile does not support anything other'
138 139 ' than os.SEEK_SET for whence on seek()')
139 140 if offset != 0:
140 141 raise NotImplementedError(
141 142 '_multifile only supports seeking to start, but that '
142 143 'could be fixed if you need it')
143 144 for f in self._fileobjs:
144 145 f.seek(0)
145 146 self._index = 0
146 147
147 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 149 repobaseurl, cmd, args):
149 150 """Make an HTTP request to run a command for a version 1 client.
150 151
151 152 ``caps`` is a set of known server capabilities. The value may be
152 153 None if capabilities are not yet known.
153 154
154 155 ``capablefn`` is a function to evaluate a capability.
155 156
156 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 158 raw data to pass to it.
158 159 """
159 160 if cmd == 'pushkey':
160 161 args['data'] = ''
161 162 data = args.pop('data', None)
162 163 headers = args.pop('headers', {})
163 164
164 165 ui.debug("sending %s command\n" % cmd)
165 166 q = [('cmd', cmd)]
166 167 headersize = 0
167 168 # Important: don't use self.capable() here or else you end up
168 169 # with infinite recursion when trying to look up capabilities
169 170 # for the first time.
170 171 postargsok = caps is not None and 'httppostargs' in caps
171 172
172 173 # Send arguments via POST.
173 174 if postargsok and args:
174 175 strargs = urlreq.urlencode(sorted(args.items()))
175 176 if not data:
176 177 data = strargs
177 178 else:
178 179 if isinstance(data, bytes):
179 180 i = io.BytesIO(data)
180 181 i.length = len(data)
181 182 data = i
182 183 argsio = io.BytesIO(strargs)
183 184 argsio.length = len(strargs)
184 185 data = _multifile(argsio, data)
185 186 headers[r'X-HgArgs-Post'] = len(strargs)
186 187 elif args:
187 188 # Calling self.capable() can infinite loop if we are calling
188 189 # "capabilities". But that command should never accept wire
189 190 # protocol arguments. So this should never happen.
190 191 assert cmd != 'capabilities'
191 192 httpheader = capablefn('httpheader')
192 193 if httpheader:
193 194 headersize = int(httpheader.split(',', 1)[0])
194 195
195 196 # Send arguments via HTTP headers.
196 197 if headersize > 0:
197 198 # The headers can typically carry more data than the URL.
198 199 encargs = urlreq.urlencode(sorted(args.items()))
199 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 201 headersize):
201 202 headers[header] = value
202 203 # Send arguments via query string (Mercurial <1.9).
203 204 else:
204 205 q += sorted(args.items())
205 206
206 207 qs = '?%s' % urlreq.urlencode(q)
207 208 cu = "%s%s" % (repobaseurl, qs)
208 209 size = 0
209 210 if util.safehasattr(data, 'length'):
210 211 size = data.length
211 212 elif data is not None:
212 213 size = len(data)
213 214 if data is not None and r'Content-Type' not in headers:
214 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 216
216 217 # Tell the server we accept application/mercurial-0.2 and multiple
217 218 # compression formats if the server is capable of emitting those
218 219 # payloads.
219 220 # Note: Keep this set empty by default, as client advertisement of
220 221 # protocol parameters should only occur after the handshake.
221 222 protoparams = set()
222 223
223 224 mediatypes = set()
224 225 if caps is not None:
225 226 mt = capablefn('httpmediatype')
226 227 if mt:
227 228 protoparams.add('0.1')
228 229 mediatypes = set(mt.split(','))
229 230
230 231 protoparams.add('partial-pull')
231 232
232 233 if '0.2tx' in mediatypes:
233 234 protoparams.add('0.2')
234 235
235 236 if '0.2tx' in mediatypes and capablefn('compression'):
236 237 # We /could/ compare supported compression formats and prune
237 238 # non-mutually supported or error if nothing is mutually supported.
238 239 # For now, send the full list to the server and have it error.
239 240 comps = [e.wireprotosupport().name for e in
240 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 242 protoparams.add('comp=%s' % ','.join(comps))
242 243
243 244 if protoparams:
244 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 246 'X-HgProto',
246 247 headersize or 1024)
247 248 for header, value in protoheaders:
248 249 headers[header] = value
249 250
250 251 varyheaders = []
251 252 for header in headers:
252 253 if header.lower().startswith(r'x-hg'):
253 254 varyheaders.append(header)
254 255
255 256 if varyheaders:
256 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 258
258 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 260
260 261 if data is not None:
261 262 ui.debug("sending %d bytes\n" % size)
262 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 264
264 265 return req, cu, qs
265 266
266 267 def _reqdata(req):
267 268 """Get request data, if any. If no data, returns None."""
268 269 if pycompat.ispy3:
269 270 return req.data
270 271 if not req.has_data():
271 272 return None
272 273 return req.get_data()
273 274
274 275 def sendrequest(ui, opener, req):
275 276 """Send a prepared HTTP request.
276 277
277 278 Returns the response object.
278 279 """
279 280 dbg = ui.debug
280 281 if (ui.debugflag
281 282 and ui.configbool('devel', 'debug.peer-request')):
282 283 line = 'devel-peer-request: %s\n'
283 284 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
284 285 pycompat.bytesurl(req.get_full_url())))
285 286 hgargssize = None
286 287
287 288 for header, value in sorted(req.header_items()):
288 289 header = pycompat.bytesurl(header)
289 290 value = pycompat.bytesurl(value)
290 291 if header.startswith('X-hgarg-'):
291 292 if hgargssize is None:
292 293 hgargssize = 0
293 294 hgargssize += len(value)
294 295 else:
295 296 dbg(line % ' %s %s' % (header, value))
296 297
297 298 if hgargssize is not None:
298 299 dbg(line % ' %d bytes of commands arguments in headers'
299 300 % hgargssize)
300 301 data = _reqdata(req)
301 302 if data is not None:
302 303 length = getattr(data, 'length', None)
303 304 if length is None:
304 305 length = len(data)
305 306 dbg(line % ' %d bytes of data' % length)
306 307
307 308 start = util.timer()
308 309
309 310 res = None
310 311 try:
311 312 res = opener.open(req)
312 313 except urlerr.httperror as inst:
313 314 if inst.code == 401:
314 315 raise error.Abort(_('authorization failed'))
315 316 raise
316 317 except httplib.HTTPException as inst:
317 318 ui.debug('http error requesting %s\n' %
318 319 util.hidepassword(req.get_full_url()))
319 320 ui.traceback()
320 321 raise IOError(None, inst)
321 322 finally:
322 323 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
323 324 code = res.code if res else -1
324 325 dbg(line % ' finished in %.4f seconds (%d)'
325 326 % (util.timer() - start, code))
326 327
327 328 # Insert error handlers for common I/O failures.
328 329 _wraphttpresponse(res)
329 330
330 331 return res
331 332
332 333 class RedirectedRepoError(error.RepoError):
333 334 def __init__(self, msg, respurl):
334 335 super(RedirectedRepoError, self).__init__(msg)
335 336 self.respurl = respurl
336 337
337 338 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
338 339 allowcbor=False):
339 340 # record the url we got redirected to
340 341 redirected = False
341 342 respurl = pycompat.bytesurl(resp.geturl())
342 343 if respurl.endswith(qs):
343 344 respurl = respurl[:-len(qs)]
344 345 qsdropped = False
345 346 else:
346 347 qsdropped = True
347 348
348 349 if baseurl.rstrip('/') != respurl.rstrip('/'):
349 350 redirected = True
350 351 if not ui.quiet:
351 352 ui.warn(_('real URL is %s\n') % respurl)
352 353
353 354 try:
354 355 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
355 356 except AttributeError:
356 357 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
357 358
358 359 safeurl = util.hidepassword(baseurl)
359 360 if proto.startswith('application/hg-error'):
360 361 raise error.OutOfBandError(resp.read())
361 362
362 363 # Pre 1.0 versions of Mercurial used text/plain and
363 364 # application/hg-changegroup. We don't support such old servers.
364 365 if not proto.startswith('application/mercurial-'):
365 366 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
366 367 msg = _("'%s' does not appear to be an hg repository:\n"
367 368 "---%%<--- (%s)\n%s\n---%%<---\n") % (
368 369 safeurl, proto or 'no content-type', resp.read(1024))
369 370
370 371 # Some servers may strip the query string from the redirect. We
371 372 # raise a special error type so callers can react to this specially.
372 373 if redirected and qsdropped:
373 374 raise RedirectedRepoError(msg, respurl)
374 375 else:
375 376 raise error.RepoError(msg)
376 377
377 378 try:
378 379 subtype = proto.split('-', 1)[1]
379 380
380 381 # Unless we end up supporting CBOR in the legacy wire protocol,
381 382 # this should ONLY be encountered for the initial capabilities
382 383 # request during handshake.
383 384 if subtype == 'cbor':
384 385 if allowcbor:
385 386 return respurl, proto, resp
386 387 else:
387 388 raise error.RepoError(_('unexpected CBOR response from '
388 389 'server'))
389 390
390 391 version_info = tuple([int(n) for n in subtype.split('.')])
391 392 except ValueError:
392 393 raise error.RepoError(_("'%s' sent a broken Content-Type "
393 394 "header (%s)") % (safeurl, proto))
394 395
395 396 # TODO consider switching to a decompression reader that uses
396 397 # generators.
397 398 if version_info == (0, 1):
398 399 if compressible:
399 400 resp = util.compengines['zlib'].decompressorreader(resp)
400 401
401 402 elif version_info == (0, 2):
402 403 # application/mercurial-0.2 always identifies the compression
403 404 # engine in the payload header.
404 405 elen = struct.unpack('B', resp.read(1))[0]
405 406 ename = resp.read(elen)
406 407 engine = util.compengines.forwiretype(ename)
407 408
408 409 resp = engine.decompressorreader(resp)
409 410 else:
410 411 raise error.RepoError(_("'%s' uses newer protocol %s") %
411 412 (safeurl, subtype))
412 413
413 414 return respurl, proto, resp
414 415
415 416 class httppeer(wireprotov1peer.wirepeer):
416 417 def __init__(self, ui, path, url, opener, requestbuilder, caps):
417 418 self.ui = ui
418 419 self._path = path
419 420 self._url = url
420 421 self._caps = caps
421 422 self._urlopener = opener
422 423 self._requestbuilder = requestbuilder
423 424
424 425 def __del__(self):
425 426 for h in self._urlopener.handlers:
426 427 h.close()
427 428 getattr(h, "close_all", lambda: None)()
428 429
429 430 # Begin of ipeerconnection interface.
430 431
431 432 def url(self):
432 433 return self._path
433 434
434 435 def local(self):
435 436 return None
436 437
437 438 def peer(self):
438 439 return self
439 440
440 441 def canpush(self):
441 442 return True
442 443
443 444 def close(self):
444 445 pass
445 446
446 447 # End of ipeerconnection interface.
447 448
448 449 # Begin of ipeercommands interface.
449 450
450 451 def capabilities(self):
451 452 return self._caps
452 453
453 454 # End of ipeercommands interface.
454 455
455 456 def _callstream(self, cmd, _compressible=False, **args):
456 457 args = pycompat.byteskwargs(args)
457 458
458 459 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
459 460 self._caps, self.capable,
460 461 self._url, cmd, args)
461 462
462 463 resp = sendrequest(self.ui, self._urlopener, req)
463 464
464 465 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
465 466 resp, _compressible)
466 467
467 468 return resp
468 469
469 470 def _call(self, cmd, **args):
470 471 fp = self._callstream(cmd, **args)
471 472 try:
472 473 return fp.read()
473 474 finally:
474 475 # if using keepalive, allow connection to be reused
475 476 fp.close()
476 477
477 478 def _callpush(self, cmd, cg, **args):
478 479 # have to stream bundle to a temp file because we do not have
479 480 # http 1.1 chunked transfer.
480 481
481 482 types = self.capable('unbundle')
482 483 try:
483 484 types = types.split(',')
484 485 except AttributeError:
485 486 # servers older than d1b16a746db6 will send 'unbundle' as a
486 487 # boolean capability. They only support headerless/uncompressed
487 488 # bundles.
488 489 types = [""]
489 490 for x in types:
490 491 if x in bundle2.bundletypes:
491 492 type = x
492 493 break
493 494
494 495 tempname = bundle2.writebundle(self.ui, cg, None, type)
495 496 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
496 497 headers = {r'Content-Type': r'application/mercurial-0.1'}
497 498
498 499 try:
499 500 r = self._call(cmd, data=fp, headers=headers, **args)
500 501 vals = r.split('\n', 1)
501 502 if len(vals) < 2:
502 503 raise error.ResponseError(_("unexpected response:"), r)
503 504 return vals
504 505 except urlerr.httperror:
505 506 # Catch and re-raise these so we don't try and treat them
506 507 # like generic socket errors. They lack any values in
507 508 # .args on Python 3 which breaks our socket.error block.
508 509 raise
509 510 except socket.error as err:
510 511 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
511 512 raise error.Abort(_('push failed: %s') % err.args[1])
512 513 raise error.Abort(err.args[1])
513 514 finally:
514 515 fp.close()
515 516 os.unlink(tempname)
516 517
517 518 def _calltwowaystream(self, cmd, fp, **args):
518 519 fh = None
519 520 fp_ = None
520 521 filename = None
521 522 try:
522 523 # dump bundle to disk
523 524 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
524 525 fh = os.fdopen(fd, r"wb")
525 526 d = fp.read(4096)
526 527 while d:
527 528 fh.write(d)
528 529 d = fp.read(4096)
529 530 fh.close()
530 531 # start http push
531 532 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
532 533 headers = {r'Content-Type': r'application/mercurial-0.1'}
533 534 return self._callstream(cmd, data=fp_, headers=headers, **args)
534 535 finally:
535 536 if fp_ is not None:
536 537 fp_.close()
537 538 if fh is not None:
538 539 fh.close()
539 540 os.unlink(filename)
540 541
541 542 def _callcompressable(self, cmd, **args):
542 543 return self._callstream(cmd, _compressible=True, **args)
543 544
544 545 def _abort(self, exception):
545 546 raise exception
546 547
547 548 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
548 549 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
549 550 buffersends=True)
550 551
551 552 handler = wireprotov2peer.clienthandler(ui, reactor)
552 553
553 554 url = '%s/%s' % (apiurl, permission)
554 555
555 556 if len(requests) > 1:
556 557 url += '/multirequest'
557 558 else:
558 559 url += '/%s' % requests[0][0]
559 560
561 ui.debug('sending %d commands\n' % len(requests))
560 562 for command, args, f in requests:
563 ui.debug('sending command %s: %s\n' % (
564 command, stringutil.pprint(args, indent=2)))
561 565 assert not list(handler.callcommand(command, args, f))
562 566
563 567 # TODO stream this.
564 568 body = b''.join(map(bytes, handler.flushcommands()))
565 569
566 570 # TODO modify user-agent to reflect v2
567 571 headers = {
568 572 r'Accept': wireprotov2server.FRAMINGTYPE,
569 573 r'Content-Type': wireprotov2server.FRAMINGTYPE,
570 574 }
571 575
572 576 req = requestbuilder(pycompat.strurl(url), body, headers)
573 577 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
574 578
575 579 try:
576 580 res = opener.open(req)
577 581 except urlerr.httperror as e:
578 582 if e.code == 401:
579 583 raise error.Abort(_('authorization failed'))
580 584
581 585 raise
582 586 except httplib.HTTPException as e:
583 587 ui.traceback()
584 588 raise IOError(None, e)
585 589
586 590 return handler, res
587 591
588 592 class queuedcommandfuture(pycompat.futures.Future):
589 593 """Wraps result() on command futures to trigger submission on call."""
590 594
591 595 def result(self, timeout=None):
592 596 if self.done():
593 597 return pycompat.futures.Future.result(self, timeout)
594 598
595 599 self._peerexecutor.sendcommands()
596 600
597 601 # sendcommands() will restore the original __class__ and self.result
598 602 # will resolve to Future.result.
599 603 return self.result(timeout)
600 604
601 605 @interfaceutil.implementer(repository.ipeercommandexecutor)
602 606 class httpv2executor(object):
603 607 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
604 608 self._ui = ui
605 609 self._opener = opener
606 610 self._requestbuilder = requestbuilder
607 611 self._apiurl = apiurl
608 612 self._descriptor = descriptor
609 613 self._sent = False
610 614 self._closed = False
611 615 self._neededpermissions = set()
612 616 self._calls = []
613 617 self._futures = weakref.WeakSet()
614 618 self._responseexecutor = None
615 619 self._responsef = None
616 620
617 621 def __enter__(self):
618 622 return self
619 623
620 624 def __exit__(self, exctype, excvalue, exctb):
621 625 self.close()
622 626
623 627 def callcommand(self, command, args):
624 628 if self._sent:
625 629 raise error.ProgrammingError('callcommand() cannot be used after '
626 630 'commands are sent')
627 631
628 632 if self._closed:
629 633 raise error.ProgrammingError('callcommand() cannot be used after '
630 634 'close()')
631 635
632 636 # The service advertises which commands are available. So if we attempt
633 637 # to call an unknown command or pass an unknown argument, we can screen
634 638 # for this.
635 639 if command not in self._descriptor['commands']:
636 640 raise error.ProgrammingError(
637 641 'wire protocol command %s is not available' % command)
638 642
639 643 cmdinfo = self._descriptor['commands'][command]
640 644 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
641 645
642 646 if unknownargs:
643 647 raise error.ProgrammingError(
644 648 'wire protocol command %s does not accept argument: %s' % (
645 649 command, ', '.join(sorted(unknownargs))))
646 650
647 651 self._neededpermissions |= set(cmdinfo['permissions'])
648 652
649 653 # TODO we /could/ also validate types here, since the API descriptor
650 654 # includes types...
651 655
652 656 f = pycompat.futures.Future()
653 657
654 658 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
655 659 # could deadlock.
656 660 f.__class__ = queuedcommandfuture
657 661 f._peerexecutor = self
658 662
659 663 self._futures.add(f)
660 664 self._calls.append((command, args, f))
661 665
662 666 return f
663 667
664 668 def sendcommands(self):
665 669 if self._sent:
666 670 return
667 671
668 672 if not self._calls:
669 673 return
670 674
671 675 self._sent = True
672 676
673 677 # Unhack any future types so caller sees a clean type and so we
674 678 # break reference cycle.
675 679 for f in self._futures:
676 680 if isinstance(f, queuedcommandfuture):
677 681 f.__class__ = pycompat.futures.Future
678 682 f._peerexecutor = None
679 683
680 684 # Mark the future as running and filter out cancelled futures.
681 685 calls = [(command, args, f)
682 686 for command, args, f in self._calls
683 687 if f.set_running_or_notify_cancel()]
684 688
685 689 # Clear out references, prevent improper object usage.
686 690 self._calls = None
687 691
688 692 if not calls:
689 693 return
690 694
691 695 permissions = set(self._neededpermissions)
692 696
693 697 if 'push' in permissions and 'pull' in permissions:
694 698 permissions.remove('pull')
695 699
696 700 if len(permissions) > 1:
697 701 raise error.RepoError(_('cannot make request requiring multiple '
698 702 'permissions: %s') %
699 703 _(', ').join(sorted(permissions)))
700 704
701 705 permission = {
702 706 'push': 'rw',
703 707 'pull': 'ro',
704 708 }[permissions.pop()]
705 709
706 710 handler, resp = sendv2request(
707 711 self._ui, self._opener, self._requestbuilder, self._apiurl,
708 712 permission, calls)
709 713
710 714 # TODO we probably want to validate the HTTP code, media type, etc.
711 715
712 716 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
713 717 self._responsef = self._responseexecutor.submit(self._handleresponse,
714 718 handler, resp)
715 719
716 720 def close(self):
717 721 if self._closed:
718 722 return
719 723
720 724 self.sendcommands()
721 725
722 726 self._closed = True
723 727
724 728 if not self._responsef:
725 729 return
726 730
727 731 try:
728 732 self._responsef.result()
729 733 finally:
730 734 self._responseexecutor.shutdown(wait=True)
731 735 self._responsef = None
732 736 self._responseexecutor = None
733 737
734 738 # If any of our futures are still in progress, mark them as
735 739 # errored, otherwise a result() could wait indefinitely.
736 740 for f in self._futures:
737 741 if not f.done():
738 742 f.set_exception(error.ResponseError(
739 743 _('unfulfilled command response')))
740 744
741 745 self._futures = None
742 746
743 747 def _handleresponse(self, handler, resp):
744 748 # Called in a thread to read the response.
745 749
746 750 while handler.readframe(resp):
747 751 pass
748 752
749 753 # TODO implement interface for version 2 peers
750 754 @interfaceutil.implementer(repository.ipeerconnection,
751 755 repository.ipeercapabilities,
752 756 repository.ipeerrequests)
753 757 class httpv2peer(object):
754 758 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
755 759 apidescriptor):
756 760 self.ui = ui
757 761
758 762 if repourl.endswith('/'):
759 763 repourl = repourl[:-1]
760 764
761 765 self._url = repourl
762 766 self._apipath = apipath
763 767 self._apiurl = '%s/%s' % (repourl, apipath)
764 768 self._opener = opener
765 769 self._requestbuilder = requestbuilder
766 770 self._descriptor = apidescriptor
767 771
768 772 # Start of ipeerconnection.
769 773
770 774 def url(self):
771 775 return self._url
772 776
773 777 def local(self):
774 778 return None
775 779
776 780 def peer(self):
777 781 return self
778 782
779 783 def canpush(self):
780 784 # TODO change once implemented.
781 785 return False
782 786
783 787 def close(self):
784 788 pass
785 789
786 790 # End of ipeerconnection.
787 791
788 792 # Start of ipeercapabilities.
789 793
790 794 def capable(self, name):
791 795 # The capabilities used internally historically map to capabilities
792 796 # advertised from the "capabilities" wire protocol command. However,
793 797 # version 2 of that command works differently.
794 798
795 799 # Maps to commands that are available.
796 800 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
797 801 return True
798 802
799 803 # Other concepts.
800 804 if name in ('bundle2',):
801 805 return True
802 806
803 807 return False
804 808
805 809 def requirecap(self, name, purpose):
806 810 if self.capable(name):
807 811 return
808 812
809 813 raise error.CapabilityError(
810 814 _('cannot %s; client or remote repository does not support the %r '
811 815 'capability') % (purpose, name))
812 816
813 817 # End of ipeercapabilities.
814 818
815 819 def _call(self, name, **args):
816 820 with self.commandexecutor() as e:
817 821 return e.callcommand(name, args).result()
818 822
819 823 def commandexecutor(self):
820 824 return httpv2executor(self.ui, self._opener, self._requestbuilder,
821 825 self._apiurl, self._descriptor)
822 826
823 827 # Registry of API service names to metadata about peers that handle it.
824 828 #
825 829 # The following keys are meaningful:
826 830 #
827 831 # init
828 832 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
829 833 # apidescriptor) to create a peer.
830 834 #
831 835 # priority
832 836 # Integer priority for the service. If we could choose from multiple
833 837 # services, we choose the one with the highest priority.
834 838 API_PEERS = {
835 839 wireprototypes.HTTP_WIREPROTO_V2: {
836 840 'init': httpv2peer,
837 841 'priority': 50,
838 842 },
839 843 }
840 844
841 845 def performhandshake(ui, url, opener, requestbuilder):
842 846 # The handshake is a request to the capabilities command.
843 847
844 848 caps = None
845 849 def capable(x):
846 850 raise error.ProgrammingError('should not be called')
847 851
848 852 args = {}
849 853
850 854 # The client advertises support for newer protocols by adding an
851 855 # X-HgUpgrade-* header with a list of supported APIs and an
852 856 # X-HgProto-* header advertising which serializing formats it supports.
853 857 # We only support the HTTP version 2 transport and CBOR responses for
854 858 # now.
855 859 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
856 860
857 861 if advertisev2:
858 862 args['headers'] = {
859 863 r'X-HgProto-1': r'cbor',
860 864 }
861 865
862 866 args['headers'].update(
863 867 encodevalueinheaders(' '.join(sorted(API_PEERS)),
864 868 'X-HgUpgrade',
865 869 # We don't know the header limit this early.
866 870 # So make it small.
867 871 1024))
868 872
869 873 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
870 874 capable, url, 'capabilities',
871 875 args)
872 876 resp = sendrequest(ui, opener, req)
873 877
874 878 # The server may redirect us to the repo root, stripping the
875 879 # ?cmd=capabilities query string from the URL. The server would likely
876 880 # return HTML in this case and ``parsev1commandresponse()`` would raise.
877 881 # We catch this special case and re-issue the capabilities request against
878 882 # the new URL.
879 883 #
880 884 # We should ideally not do this, as a redirect that drops the query
881 885 # string from the URL is arguably a server bug. (Garbage in, garbage out).
882 886 # However, Mercurial clients for several years appeared to handle this
883 887 # issue without behavior degradation. And according to issue 5860, it may
884 888 # be a longstanding bug in some server implementations. So we allow a
885 889 # redirect that drops the query string to "just work."
886 890 try:
887 891 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
888 892 compressible=False,
889 893 allowcbor=advertisev2)
890 894 except RedirectedRepoError as e:
891 895 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
892 896 capable, e.respurl,
893 897 'capabilities', args)
894 898 resp = sendrequest(ui, opener, req)
895 899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
896 900 compressible=False,
897 901 allowcbor=advertisev2)
898 902
899 903 try:
900 904 rawdata = resp.read()
901 905 finally:
902 906 resp.close()
903 907
904 908 if not ct.startswith('application/mercurial-'):
905 909 raise error.ProgrammingError('unexpected content-type: %s' % ct)
906 910
907 911 if advertisev2:
908 912 if ct == 'application/mercurial-cbor':
909 913 try:
910 914 info = cbor.loads(rawdata)
911 915 except cbor.CBORDecodeError:
912 916 raise error.Abort(_('error decoding CBOR from remote server'),
913 917 hint=_('try again and consider contacting '
914 918 'the server operator'))
915 919
916 920 # We got a legacy response. That's fine.
917 921 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
918 922 info = {
919 923 'v1capabilities': set(rawdata.split())
920 924 }
921 925
922 926 else:
923 927 raise error.RepoError(
924 928 _('unexpected response type from server: %s') % ct)
925 929 else:
926 930 info = {
927 931 'v1capabilities': set(rawdata.split())
928 932 }
929 933
930 934 return respurl, info
931 935
932 936 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
933 937 """Construct an appropriate HTTP peer instance.
934 938
935 939 ``opener`` is an ``url.opener`` that should be used to establish
936 940 connections, perform HTTP requests.
937 941
938 942 ``requestbuilder`` is the type used for constructing HTTP requests.
939 943 It exists as an argument so extensions can override the default.
940 944 """
941 945 u = util.url(path)
942 946 if u.query or u.fragment:
943 947 raise error.Abort(_('unsupported URL component: "%s"') %
944 948 (u.query or u.fragment))
945 949
946 950 # urllib cannot handle URLs with embedded user or passwd.
947 951 url, authinfo = u.authinfo()
948 952 ui.debug('using %s\n' % url)
949 953
950 954 opener = opener or urlmod.opener(ui, authinfo)
951 955
952 956 respurl, info = performhandshake(ui, url, opener, requestbuilder)
953 957
954 958 # Given the intersection of APIs that both we and the server support,
955 959 # sort by their advertised priority and pick the first one.
956 960 #
957 961 # TODO consider making this request-based and interface driven. For
958 962 # example, the caller could say "I want a peer that does X." It's quite
959 963 # possible that not all peers would do that. Since we know the service
960 964 # capabilities, we could filter out services not meeting the
961 965 # requirements. Possibly by consulting the interfaces defined by the
962 966 # peer type.
963 967 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
964 968
965 969 preferredchoices = sorted(apipeerchoices,
966 970 key=lambda x: API_PEERS[x]['priority'],
967 971 reverse=True)
968 972
969 973 for service in preferredchoices:
970 974 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
971 975
972 976 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
973 977 requestbuilder,
974 978 info['apis'][service])
975 979
976 980 # Failed to construct an API peer. Fall back to legacy.
977 981 return httppeer(ui, path, respurl, opener, requestbuilder,
978 982 info['v1capabilities'])
979 983
980 984 def instance(ui, path, create, intents=None):
981 985 if create:
982 986 raise error.Abort(_('cannot create new http repository'))
983 987 try:
984 988 if path.startswith('https:') and not urlmod.has_https:
985 989 raise error.Abort(_('Python support for SSL and HTTPS '
986 990 'is not installed'))
987 991
988 992 inst = makepeer(ui, path)
989 993
990 994 return inst
991 995 except error.RepoError as httpexception:
992 996 try:
993 997 r = statichttprepo.instance(ui, "static-" + path, create)
994 998 ui.note(_('(falling back to static-http)\n'))
995 999 return r
996 1000 except error.RepoError:
997 1001 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now