##// END OF EJS Templates
httppeer: no matter what Python 3 might think, http headers are bytes...
Augie Fackler -
r37755:6cb7e3b9 default
parent child Browse files
Show More
@@ -1,951 +1,953
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17 import weakref
18 18
19 19 from .i18n import _
20 20 from .thirdparty import (
21 21 cbor,
22 22 )
23 23 from .thirdparty.zope import (
24 24 interface as zi,
25 25 )
26 26 from . import (
27 27 bundle2,
28 28 error,
29 29 httpconnection,
30 30 pycompat,
31 31 repository,
32 32 statichttprepo,
33 33 url as urlmod,
34 34 util,
35 35 wireprotoframing,
36 36 wireprototypes,
37 37 wireprotov1peer,
38 38 wireprotov2peer,
39 39 wireprotov2server,
40 40 )
41 41
42 42 httplib = util.httplib
43 43 urlerr = util.urlerr
44 44 urlreq = util.urlreq
45 45
46 46 def encodevalueinheaders(value, header, limit):
47 47 """Encode a string value into multiple HTTP headers.
48 48
49 49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 51 name + value will be at most ``limit`` bytes long.
52 52
53 53 Returns an iterable of 2-tuples consisting of header names and
54 54 values as native strings.
55 55 """
56 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 57 # not bytes. This function always takes bytes in as arguments.
58 58 fmt = pycompat.strurl(header) + r'-%s'
59 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 60 # and not a unicode: we're just getting the encoded length anyway,
61 61 # and using an r-string to make it portable between Python 2 and 3
62 62 # doesn't work because then the \r is a literal backslash-r
63 63 # instead of a carriage return.
64 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 65 result = []
66 66
67 67 n = 0
68 68 for i in xrange(0, len(value), valuelen):
69 69 n += 1
70 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 71
72 72 return result
73 73
74 74 def _wraphttpresponse(resp):
75 75 """Wrap an HTTPResponse with common error handlers.
76 76
77 77 This ensures that any I/O from any consumer raises the appropriate
78 78 error and messaging.
79 79 """
80 80 origread = resp.read
81 81
82 82 class readerproxy(resp.__class__):
83 83 def read(self, size=None):
84 84 try:
85 85 return origread(size)
86 86 except httplib.IncompleteRead as e:
87 87 # e.expected is an integer if length known or None otherwise.
88 88 if e.expected:
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (e.expected,
91 91 len(e.partial))
92 92 else:
93 93 msg = _('HTTP request error (incomplete response)')
94 94
95 95 raise error.PeerTransportError(
96 96 msg,
97 97 hint=_('this may be an intermittent network failure; '
98 98 'if the error persists, consider contacting the '
99 99 'network or server operator'))
100 100 except httplib.HTTPException as e:
101 101 raise error.PeerTransportError(
102 102 _('HTTP request error (%s)') % e,
103 103 hint=_('this may be an intermittent network failure; '
104 104 'if the error persists, consider contacting the '
105 105 'network or server operator'))
106 106
107 107 resp.__class__ = readerproxy
108 108
109 109 class _multifile(object):
110 110 def __init__(self, *fileobjs):
111 111 for f in fileobjs:
112 112 if not util.safehasattr(f, 'length'):
113 113 raise ValueError(
114 114 '_multifile only supports file objects that '
115 115 'have a length but this one does not:', type(f), f)
116 116 self._fileobjs = fileobjs
117 117 self._index = 0
118 118
119 119 @property
120 120 def length(self):
121 121 return sum(f.length for f in self._fileobjs)
122 122
123 123 def read(self, amt=None):
124 124 if amt <= 0:
125 125 return ''.join(f.read() for f in self._fileobjs)
126 126 parts = []
127 127 while amt and self._index < len(self._fileobjs):
128 128 parts.append(self._fileobjs[self._index].read(amt))
129 129 got = len(parts[-1])
130 130 if got < amt:
131 131 self._index += 1
132 132 amt -= got
133 133 return ''.join(parts)
134 134
135 135 def seek(self, offset, whence=os.SEEK_SET):
136 136 if whence != os.SEEK_SET:
137 137 raise NotImplementedError(
138 138 '_multifile does not support anything other'
139 139 ' than os.SEEK_SET for whence on seek()')
140 140 if offset != 0:
141 141 raise NotImplementedError(
142 142 '_multifile only supports seeking to start, but that '
143 143 'could be fixed if you need it')
144 144 for f in self._fileobjs:
145 145 f.seek(0)
146 146 self._index = 0
147 147
148 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
149 149 repobaseurl, cmd, args):
150 150 """Make an HTTP request to run a command for a version 1 client.
151 151
152 152 ``caps`` is a set of known server capabilities. The value may be
153 153 None if capabilities are not yet known.
154 154
155 155 ``capablefn`` is a function to evaluate a capability.
156 156
157 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
158 158 raw data to pass to it.
159 159 """
160 160 if cmd == 'pushkey':
161 161 args['data'] = ''
162 162 data = args.pop('data', None)
163 163 headers = args.pop('headers', {})
164 164
165 165 ui.debug("sending %s command\n" % cmd)
166 166 q = [('cmd', cmd)]
167 167 headersize = 0
168 168 # Important: don't use self.capable() here or else you end up
169 169 # with infinite recursion when trying to look up capabilities
170 170 # for the first time.
171 171 postargsok = caps is not None and 'httppostargs' in caps
172 172
173 173 # Send arguments via POST.
174 174 if postargsok and args:
175 175 strargs = urlreq.urlencode(sorted(args.items()))
176 176 if not data:
177 177 data = strargs
178 178 else:
179 179 if isinstance(data, bytes):
180 180 i = io.BytesIO(data)
181 181 i.length = len(data)
182 182 data = i
183 183 argsio = io.BytesIO(strargs)
184 184 argsio.length = len(strargs)
185 185 data = _multifile(argsio, data)
186 186 headers[r'X-HgArgs-Post'] = len(strargs)
187 187 elif args:
188 188 # Calling self.capable() can infinite loop if we are calling
189 189 # "capabilities". But that command should never accept wire
190 190 # protocol arguments. So this should never happen.
191 191 assert cmd != 'capabilities'
192 192 httpheader = capablefn('httpheader')
193 193 if httpheader:
194 194 headersize = int(httpheader.split(',', 1)[0])
195 195
196 196 # Send arguments via HTTP headers.
197 197 if headersize > 0:
198 198 # The headers can typically carry more data than the URL.
199 199 encargs = urlreq.urlencode(sorted(args.items()))
200 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
201 201 headersize):
202 202 headers[header] = value
203 203 # Send arguments via query string (Mercurial <1.9).
204 204 else:
205 205 q += sorted(args.items())
206 206
207 207 qs = '?%s' % urlreq.urlencode(q)
208 208 cu = "%s%s" % (repobaseurl, qs)
209 209 size = 0
210 210 if util.safehasattr(data, 'length'):
211 211 size = data.length
212 212 elif data is not None:
213 213 size = len(data)
214 214 if data is not None and r'Content-Type' not in headers:
215 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
216 216
217 217 # Tell the server we accept application/mercurial-0.2 and multiple
218 218 # compression formats if the server is capable of emitting those
219 219 # payloads.
220 220 # Note: Keep this set empty by default, as client advertisement of
221 221 # protocol parameters should only occur after the handshake.
222 222 protoparams = set()
223 223
224 224 mediatypes = set()
225 225 if caps is not None:
226 226 mt = capablefn('httpmediatype')
227 227 if mt:
228 228 protoparams.add('0.1')
229 229 mediatypes = set(mt.split(','))
230 230
231 231 protoparams.add('partial-pull')
232 232
233 233 if '0.2tx' in mediatypes:
234 234 protoparams.add('0.2')
235 235
236 236 if '0.2tx' in mediatypes and capablefn('compression'):
237 237 # We /could/ compare supported compression formats and prune
238 238 # non-mutually supported or error if nothing is mutually supported.
239 239 # For now, send the full list to the server and have it error.
240 240 comps = [e.wireprotosupport().name for e in
241 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
242 242 protoparams.add('comp=%s' % ','.join(comps))
243 243
244 244 if protoparams:
245 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
246 246 'X-HgProto',
247 247 headersize or 1024)
248 248 for header, value in protoheaders:
249 249 headers[header] = value
250 250
251 251 varyheaders = []
252 252 for header in headers:
253 253 if header.lower().startswith(r'x-hg'):
254 254 varyheaders.append(header)
255 255
256 256 if varyheaders:
257 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
258 258
259 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
260 260
261 261 if data is not None:
262 262 ui.debug("sending %d bytes\n" % size)
263 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
264 264
265 265 return req, cu, qs
266 266
267 267 def sendrequest(ui, opener, req):
268 268 """Send a prepared HTTP request.
269 269
270 270 Returns the response object.
271 271 """
272 272 if (ui.debugflag
273 273 and ui.configbool('devel', 'debug.peer-request')):
274 274 dbg = ui.debug
275 275 line = 'devel-peer-request: %s\n'
276 276 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
277 277 pycompat.bytesurl(req.get_full_url())))
278 278 hgargssize = None
279 279
280 280 for header, value in sorted(req.header_items()):
281 header = pycompat.bytesurl(header)
282 value = pycompat.bytesurl(value)
281 283 if header.startswith('X-hgarg-'):
282 284 if hgargssize is None:
283 285 hgargssize = 0
284 286 hgargssize += len(value)
285 287 else:
286 288 dbg(line % ' %s %s' % (header, value))
287 289
288 290 if hgargssize is not None:
289 291 dbg(line % ' %d bytes of commands arguments in headers'
290 292 % hgargssize)
291 293
292 294 if req.has_data():
293 295 data = req.get_data()
294 296 length = getattr(data, 'length', None)
295 297 if length is None:
296 298 length = len(data)
297 299 dbg(line % ' %d bytes of data' % length)
298 300
299 301 start = util.timer()
300 302
301 303 try:
302 304 res = opener.open(req)
303 305 except urlerr.httperror as inst:
304 306 if inst.code == 401:
305 307 raise error.Abort(_('authorization failed'))
306 308 raise
307 309 except httplib.HTTPException as inst:
308 310 ui.debug('http error requesting %s\n' %
309 311 util.hidepassword(req.get_full_url()))
310 312 ui.traceback()
311 313 raise IOError(None, inst)
312 314 finally:
313 315 if ui.configbool('devel', 'debug.peer-request'):
314 316 dbg(line % ' finished in %.4f seconds (%d)'
315 317 % (util.timer() - start, res.code))
316 318
317 319 # Insert error handlers for common I/O failures.
318 320 _wraphttpresponse(res)
319 321
320 322 return res
321 323
322 324 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
323 325 allowcbor=False):
324 326 # record the url we got redirected to
325 327 respurl = pycompat.bytesurl(resp.geturl())
326 328 if respurl.endswith(qs):
327 329 respurl = respurl[:-len(qs)]
328 330 if baseurl.rstrip('/') != respurl.rstrip('/'):
329 331 if not ui.quiet:
330 332 ui.warn(_('real URL is %s\n') % respurl)
331 333
332 334 try:
333 335 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
334 336 except AttributeError:
335 337 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
336 338
337 339 safeurl = util.hidepassword(baseurl)
338 340 if proto.startswith('application/hg-error'):
339 341 raise error.OutOfBandError(resp.read())
340 342
341 343 # Pre 1.0 versions of Mercurial used text/plain and
342 344 # application/hg-changegroup. We don't support such old servers.
343 345 if not proto.startswith('application/mercurial-'):
344 346 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
345 347 raise error.RepoError(
346 348 _("'%s' does not appear to be an hg repository:\n"
347 349 "---%%<--- (%s)\n%s\n---%%<---\n")
348 350 % (safeurl, proto or 'no content-type', resp.read(1024)))
349 351
350 352 try:
351 353 subtype = proto.split('-', 1)[1]
352 354
353 355 # Unless we end up supporting CBOR in the legacy wire protocol,
354 356 # this should ONLY be encountered for the initial capabilities
355 357 # request during handshake.
356 358 if subtype == 'cbor':
357 359 if allowcbor:
358 360 return respurl, proto, resp
359 361 else:
360 362 raise error.RepoError(_('unexpected CBOR response from '
361 363 'server'))
362 364
363 365 version_info = tuple([int(n) for n in subtype.split('.')])
364 366 except ValueError:
365 367 raise error.RepoError(_("'%s' sent a broken Content-Type "
366 368 "header (%s)") % (safeurl, proto))
367 369
368 370 # TODO consider switching to a decompression reader that uses
369 371 # generators.
370 372 if version_info == (0, 1):
371 373 if compressible:
372 374 resp = util.compengines['zlib'].decompressorreader(resp)
373 375
374 376 elif version_info == (0, 2):
375 377 # application/mercurial-0.2 always identifies the compression
376 378 # engine in the payload header.
377 379 elen = struct.unpack('B', resp.read(1))[0]
378 380 ename = resp.read(elen)
379 381 engine = util.compengines.forwiretype(ename)
380 382
381 383 resp = engine.decompressorreader(resp)
382 384 else:
383 385 raise error.RepoError(_("'%s' uses newer protocol %s") %
384 386 (safeurl, subtype))
385 387
386 388 return respurl, proto, resp
387 389
388 390 class httppeer(wireprotov1peer.wirepeer):
389 391 def __init__(self, ui, path, url, opener, requestbuilder, caps):
390 392 self.ui = ui
391 393 self._path = path
392 394 self._url = url
393 395 self._caps = caps
394 396 self._urlopener = opener
395 397 self._requestbuilder = requestbuilder
396 398
397 399 def __del__(self):
398 400 for h in self._urlopener.handlers:
399 401 h.close()
400 402 getattr(h, "close_all", lambda: None)()
401 403
402 404 # Begin of ipeerconnection interface.
403 405
404 406 def url(self):
405 407 return self._path
406 408
407 409 def local(self):
408 410 return None
409 411
410 412 def peer(self):
411 413 return self
412 414
413 415 def canpush(self):
414 416 return True
415 417
416 418 def close(self):
417 419 pass
418 420
419 421 # End of ipeerconnection interface.
420 422
421 423 # Begin of ipeercommands interface.
422 424
423 425 def capabilities(self):
424 426 return self._caps
425 427
426 428 # End of ipeercommands interface.
427 429
428 430 # look up capabilities only when needed
429 431
430 432 def _callstream(self, cmd, _compressible=False, **args):
431 433 args = pycompat.byteskwargs(args)
432 434
433 435 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
434 436 self._caps, self.capable,
435 437 self._url, cmd, args)
436 438
437 439 resp = sendrequest(self.ui, self._urlopener, req)
438 440
439 441 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
440 442 resp, _compressible)
441 443
442 444 return resp
443 445
444 446 def _call(self, cmd, **args):
445 447 fp = self._callstream(cmd, **args)
446 448 try:
447 449 return fp.read()
448 450 finally:
449 451 # if using keepalive, allow connection to be reused
450 452 fp.close()
451 453
452 454 def _callpush(self, cmd, cg, **args):
453 455 # have to stream bundle to a temp file because we do not have
454 456 # http 1.1 chunked transfer.
455 457
456 458 types = self.capable('unbundle')
457 459 try:
458 460 types = types.split(',')
459 461 except AttributeError:
460 462 # servers older than d1b16a746db6 will send 'unbundle' as a
461 463 # boolean capability. They only support headerless/uncompressed
462 464 # bundles.
463 465 types = [""]
464 466 for x in types:
465 467 if x in bundle2.bundletypes:
466 468 type = x
467 469 break
468 470
469 471 tempname = bundle2.writebundle(self.ui, cg, None, type)
470 472 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
471 473 headers = {r'Content-Type': r'application/mercurial-0.1'}
472 474
473 475 try:
474 476 r = self._call(cmd, data=fp, headers=headers, **args)
475 477 vals = r.split('\n', 1)
476 478 if len(vals) < 2:
477 479 raise error.ResponseError(_("unexpected response:"), r)
478 480 return vals
479 481 except urlerr.httperror:
480 482 # Catch and re-raise these so we don't try and treat them
481 483 # like generic socket errors. They lack any values in
482 484 # .args on Python 3 which breaks our socket.error block.
483 485 raise
484 486 except socket.error as err:
485 487 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
486 488 raise error.Abort(_('push failed: %s') % err.args[1])
487 489 raise error.Abort(err.args[1])
488 490 finally:
489 491 fp.close()
490 492 os.unlink(tempname)
491 493
492 494 def _calltwowaystream(self, cmd, fp, **args):
493 495 fh = None
494 496 fp_ = None
495 497 filename = None
496 498 try:
497 499 # dump bundle to disk
498 500 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
499 501 fh = os.fdopen(fd, r"wb")
500 502 d = fp.read(4096)
501 503 while d:
502 504 fh.write(d)
503 505 d = fp.read(4096)
504 506 fh.close()
505 507 # start http push
506 508 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
507 509 headers = {r'Content-Type': r'application/mercurial-0.1'}
508 510 return self._callstream(cmd, data=fp_, headers=headers, **args)
509 511 finally:
510 512 if fp_ is not None:
511 513 fp_.close()
512 514 if fh is not None:
513 515 fh.close()
514 516 os.unlink(filename)
515 517
516 518 def _callcompressable(self, cmd, **args):
517 519 return self._callstream(cmd, _compressible=True, **args)
518 520
519 521 def _abort(self, exception):
520 522 raise exception
521 523
522 524 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
523 525 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
524 526 buffersends=True)
525 527
526 528 handler = wireprotov2peer.clienthandler(ui, reactor)
527 529
528 530 url = '%s/%s' % (apiurl, permission)
529 531
530 532 if len(requests) > 1:
531 533 url += '/multirequest'
532 534 else:
533 535 url += '/%s' % requests[0][0]
534 536
535 537 for command, args, f in requests:
536 538 assert not list(handler.callcommand(command, args, f))
537 539
538 540 # TODO stream this.
539 541 body = b''.join(map(bytes, handler.flushcommands()))
540 542
541 543 # TODO modify user-agent to reflect v2
542 544 headers = {
543 545 r'Accept': wireprotov2server.FRAMINGTYPE,
544 546 r'Content-Type': wireprotov2server.FRAMINGTYPE,
545 547 }
546 548
547 549 req = requestbuilder(pycompat.strurl(url), body, headers)
548 550 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
549 551
550 552 try:
551 553 res = opener.open(req)
552 554 except urlerr.httperror as e:
553 555 if e.code == 401:
554 556 raise error.Abort(_('authorization failed'))
555 557
556 558 raise
557 559 except httplib.HTTPException as e:
558 560 ui.traceback()
559 561 raise IOError(None, e)
560 562
561 563 return handler, res
562 564
563 565 class queuedcommandfuture(pycompat.futures.Future):
564 566 """Wraps result() on command futures to trigger submission on call."""
565 567
566 568 def result(self, timeout=None):
567 569 if self.done():
568 570 return pycompat.futures.Future.result(self, timeout)
569 571
570 572 self._peerexecutor.sendcommands()
571 573
572 574 # sendcommands() will restore the original __class__ and self.result
573 575 # will resolve to Future.result.
574 576 return self.result(timeout)
575 577
576 578 @zi.implementer(repository.ipeercommandexecutor)
577 579 class httpv2executor(object):
578 580 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
579 581 self._ui = ui
580 582 self._opener = opener
581 583 self._requestbuilder = requestbuilder
582 584 self._apiurl = apiurl
583 585 self._descriptor = descriptor
584 586 self._sent = False
585 587 self._closed = False
586 588 self._neededpermissions = set()
587 589 self._calls = []
588 590 self._futures = weakref.WeakSet()
589 591 self._responseexecutor = None
590 592 self._responsef = None
591 593
592 594 def __enter__(self):
593 595 return self
594 596
595 597 def __exit__(self, exctype, excvalue, exctb):
596 598 self.close()
597 599
598 600 def callcommand(self, command, args):
599 601 if self._sent:
600 602 raise error.ProgrammingError('callcommand() cannot be used after '
601 603 'commands are sent')
602 604
603 605 if self._closed:
604 606 raise error.ProgrammingError('callcommand() cannot be used after '
605 607 'close()')
606 608
607 609 # The service advertises which commands are available. So if we attempt
608 610 # to call an unknown command or pass an unknown argument, we can screen
609 611 # for this.
610 612 if command not in self._descriptor['commands']:
611 613 raise error.ProgrammingError(
612 614 'wire protocol command %s is not available' % command)
613 615
614 616 cmdinfo = self._descriptor['commands'][command]
615 617 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
616 618
617 619 if unknownargs:
618 620 raise error.ProgrammingError(
619 621 'wire protocol command %s does not accept argument: %s' % (
620 622 command, ', '.join(sorted(unknownargs))))
621 623
622 624 self._neededpermissions |= set(cmdinfo['permissions'])
623 625
624 626 # TODO we /could/ also validate types here, since the API descriptor
625 627 # includes types...
626 628
627 629 f = pycompat.futures.Future()
628 630
629 631 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
630 632 # could deadlock.
631 633 f.__class__ = queuedcommandfuture
632 634 f._peerexecutor = self
633 635
634 636 self._futures.add(f)
635 637 self._calls.append((command, args, f))
636 638
637 639 return f
638 640
639 641 def sendcommands(self):
640 642 if self._sent:
641 643 return
642 644
643 645 if not self._calls:
644 646 return
645 647
646 648 self._sent = True
647 649
648 650 # Unhack any future types so caller sees a clean type and so we
649 651 # break reference cycle.
650 652 for f in self._futures:
651 653 if isinstance(f, queuedcommandfuture):
652 654 f.__class__ = pycompat.futures.Future
653 655 f._peerexecutor = None
654 656
655 657 # Mark the future as running and filter out cancelled futures.
656 658 calls = [(command, args, f)
657 659 for command, args, f in self._calls
658 660 if f.set_running_or_notify_cancel()]
659 661
660 662 # Clear out references, prevent improper object usage.
661 663 self._calls = None
662 664
663 665 if not calls:
664 666 return
665 667
666 668 permissions = set(self._neededpermissions)
667 669
668 670 if 'push' in permissions and 'pull' in permissions:
669 671 permissions.remove('pull')
670 672
671 673 if len(permissions) > 1:
672 674 raise error.RepoError(_('cannot make request requiring multiple '
673 675 'permissions: %s') %
674 676 _(', ').join(sorted(permissions)))
675 677
676 678 permission = {
677 679 'push': 'rw',
678 680 'pull': 'ro',
679 681 }[permissions.pop()]
680 682
681 683 handler, resp = sendv2request(
682 684 self._ui, self._opener, self._requestbuilder, self._apiurl,
683 685 permission, calls)
684 686
685 687 # TODO we probably want to validate the HTTP code, media type, etc.
686 688
687 689 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
688 690 self._responsef = self._responseexecutor.submit(self._handleresponse,
689 691 handler, resp)
690 692
691 693 def close(self):
692 694 if self._closed:
693 695 return
694 696
695 697 self.sendcommands()
696 698
697 699 self._closed = True
698 700
699 701 if not self._responsef:
700 702 return
701 703
702 704 try:
703 705 self._responsef.result()
704 706 finally:
705 707 self._responseexecutor.shutdown(wait=True)
706 708 self._responsef = None
707 709 self._responseexecutor = None
708 710
709 711 # If any of our futures are still in progress, mark them as
710 712 # errored, otherwise a result() could wait indefinitely.
711 713 for f in self._futures:
712 714 if not f.done():
713 715 f.set_exception(error.ResponseError(
714 716 _('unfulfilled command response')))
715 717
716 718 self._futures = None
717 719
718 720 def _handleresponse(self, handler, resp):
719 721 # Called in a thread to read the response.
720 722
721 723 while handler.readframe(resp):
722 724 pass
723 725
724 726 # TODO implement interface for version 2 peers
725 727 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
726 728 repository.ipeerrequests)
727 729 class httpv2peer(object):
728 730 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
729 731 apidescriptor):
730 732 self.ui = ui
731 733
732 734 if repourl.endswith('/'):
733 735 repourl = repourl[:-1]
734 736
735 737 self._url = repourl
736 738 self._apipath = apipath
737 739 self._apiurl = '%s/%s' % (repourl, apipath)
738 740 self._opener = opener
739 741 self._requestbuilder = requestbuilder
740 742 self._descriptor = apidescriptor
741 743
742 744 # Start of ipeerconnection.
743 745
744 746 def url(self):
745 747 return self._url
746 748
747 749 def local(self):
748 750 return None
749 751
750 752 def peer(self):
751 753 return self
752 754
753 755 def canpush(self):
754 756 # TODO change once implemented.
755 757 return False
756 758
757 759 def close(self):
758 760 pass
759 761
760 762 # End of ipeerconnection.
761 763
762 764 # Start of ipeercapabilities.
763 765
764 766 def capable(self, name):
765 767 # The capabilities used internally historically map to capabilities
766 768 # advertised from the "capabilities" wire protocol command. However,
767 769 # version 2 of that command works differently.
768 770
769 771 # Maps to commands that are available.
770 772 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
771 773 return True
772 774
773 775 # Other concepts.
774 776 if name in ('bundle2',):
775 777 return True
776 778
777 779 return False
778 780
779 781 def requirecap(self, name, purpose):
780 782 if self.capable(name):
781 783 return
782 784
783 785 raise error.CapabilityError(
784 786 _('cannot %s; client or remote repository does not support the %r '
785 787 'capability') % (purpose, name))
786 788
787 789 # End of ipeercapabilities.
788 790
789 791 def _call(self, name, **args):
790 792 with self.commandexecutor() as e:
791 793 return e.callcommand(name, args).result()
792 794
793 795 def commandexecutor(self):
794 796 return httpv2executor(self.ui, self._opener, self._requestbuilder,
795 797 self._apiurl, self._descriptor)
796 798
797 799 # Registry of API service names to metadata about peers that handle it.
798 800 #
799 801 # The following keys are meaningful:
800 802 #
801 803 # init
802 804 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
803 805 # apidescriptor) to create a peer.
804 806 #
805 807 # priority
806 808 # Integer priority for the service. If we could choose from multiple
807 809 # services, we choose the one with the highest priority.
808 810 API_PEERS = {
809 811 wireprototypes.HTTP_WIREPROTO_V2: {
810 812 'init': httpv2peer,
811 813 'priority': 50,
812 814 },
813 815 }
814 816
815 817 def performhandshake(ui, url, opener, requestbuilder):
816 818 # The handshake is a request to the capabilities command.
817 819
818 820 caps = None
819 821 def capable(x):
820 822 raise error.ProgrammingError('should not be called')
821 823
822 824 args = {}
823 825
824 826 # The client advertises support for newer protocols by adding an
825 827 # X-HgUpgrade-* header with a list of supported APIs and an
826 828 # X-HgProto-* header advertising which serializing formats it supports.
827 829 # We only support the HTTP version 2 transport and CBOR responses for
828 830 # now.
829 831 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
830 832
831 833 if advertisev2:
832 834 args['headers'] = {
833 835 r'X-HgProto-1': r'cbor',
834 836 }
835 837
836 838 args['headers'].update(
837 839 encodevalueinheaders(' '.join(sorted(API_PEERS)),
838 840 'X-HgUpgrade',
839 841 # We don't know the header limit this early.
840 842 # So make it small.
841 843 1024))
842 844
843 845 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
844 846 capable, url, 'capabilities',
845 847 args)
846 848
847 849 resp = sendrequest(ui, opener, req)
848 850
849 851 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
850 852 compressible=False,
851 853 allowcbor=advertisev2)
852 854
853 855 try:
854 856 rawdata = resp.read()
855 857 finally:
856 858 resp.close()
857 859
858 860 if not ct.startswith('application/mercurial-'):
859 861 raise error.ProgrammingError('unexpected content-type: %s' % ct)
860 862
861 863 if advertisev2:
862 864 if ct == 'application/mercurial-cbor':
863 865 try:
864 866 info = cbor.loads(rawdata)
865 867 except cbor.CBORDecodeError:
866 868 raise error.Abort(_('error decoding CBOR from remote server'),
867 869 hint=_('try again and consider contacting '
868 870 'the server operator'))
869 871
870 872 # We got a legacy response. That's fine.
871 873 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
872 874 info = {
873 875 'v1capabilities': set(rawdata.split())
874 876 }
875 877
876 878 else:
877 879 raise error.RepoError(
878 880 _('unexpected response type from server: %s') % ct)
879 881 else:
880 882 info = {
881 883 'v1capabilities': set(rawdata.split())
882 884 }
883 885
884 886 return respurl, info
885 887
886 888 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
887 889 """Construct an appropriate HTTP peer instance.
888 890
889 891 ``opener`` is an ``url.opener`` that should be used to establish
890 892 connections, perform HTTP requests.
891 893
892 894 ``requestbuilder`` is the type used for constructing HTTP requests.
893 895 It exists as an argument so extensions can override the default.
894 896 """
895 897 u = util.url(path)
896 898 if u.query or u.fragment:
897 899 raise error.Abort(_('unsupported URL component: "%s"') %
898 900 (u.query or u.fragment))
899 901
900 902 # urllib cannot handle URLs with embedded user or passwd.
901 903 url, authinfo = u.authinfo()
902 904 ui.debug('using %s\n' % url)
903 905
904 906 opener = opener or urlmod.opener(ui, authinfo)
905 907
906 908 respurl, info = performhandshake(ui, url, opener, requestbuilder)
907 909
908 910 # Given the intersection of APIs that both we and the server support,
909 911 # sort by their advertised priority and pick the first one.
910 912 #
911 913 # TODO consider making this request-based and interface driven. For
912 914 # example, the caller could say "I want a peer that does X." It's quite
913 915 # possible that not all peers would do that. Since we know the service
914 916 # capabilities, we could filter out services not meeting the
915 917 # requirements. Possibly by consulting the interfaces defined by the
916 918 # peer type.
917 919 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
918 920
919 921 preferredchoices = sorted(apipeerchoices,
920 922 key=lambda x: API_PEERS[x]['priority'],
921 923 reverse=True)
922 924
923 925 for service in preferredchoices:
924 926 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
925 927
926 928 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
927 929 requestbuilder,
928 930 info['apis'][service])
929 931
930 932 # Failed to construct an API peer. Fall back to legacy.
931 933 return httppeer(ui, path, respurl, opener, requestbuilder,
932 934 info['v1capabilities'])
933 935
934 936 def instance(ui, path, create, intents=None):
935 937 if create:
936 938 raise error.Abort(_('cannot create new http repository'))
937 939 try:
938 940 if path.startswith('https:') and not urlmod.has_https:
939 941 raise error.Abort(_('Python support for SSL and HTTPS '
940 942 'is not installed'))
941 943
942 944 inst = makepeer(ui, path)
943 945
944 946 return inst
945 947 except error.RepoError as httpexception:
946 948 try:
947 949 r = statichttprepo.instance(ui, "static-" + path, create)
948 950 ui.note(_('(falling back to static-http)\n'))
949 951 return r
950 952 except error.RepoError:
951 953 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now