##// END OF EJS Templates
httppeer: implement command executor for version 2 peer...
Gregory Szorc -
r37669:950294e2 default
parent child Browse files
Show More
@@ -1,817 +1,1001 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 import sys
16 17 import tempfile
18 import weakref
17 19
18 20 from .i18n import _
19 21 from .thirdparty import (
20 22 cbor,
21 23 )
22 24 from .thirdparty.zope import (
23 25 interface as zi,
24 26 )
25 27 from . import (
26 28 bundle2,
27 29 error,
28 30 httpconnection,
29 31 pycompat,
30 32 repository,
31 33 statichttprepo,
32 34 url as urlmod,
33 35 util,
34 wireproto,
35 36 wireprotoframing,
36 37 wireprototypes,
37 38 wireprotov1peer,
38 39 wireprotov2server,
39 40 )
40 41
41 42 httplib = util.httplib
42 43 urlerr = util.urlerr
43 44 urlreq = util.urlreq
44 45
45 46 def encodevalueinheaders(value, header, limit):
46 47 """Encode a string value into multiple HTTP headers.
47 48
48 49 ``value`` will be encoded into 1 or more HTTP headers with the names
49 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
50 51 name + value will be at most ``limit`` bytes long.
51 52
52 53 Returns an iterable of 2-tuples consisting of header names and
53 54 values as native strings.
54 55 """
55 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
56 57 # not bytes. This function always takes bytes in as arguments.
57 58 fmt = pycompat.strurl(header) + r'-%s'
58 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
59 60 # and not a unicode: we're just getting the encoded length anyway,
60 61 # and using an r-string to make it portable between Python 2 and 3
61 62 # doesn't work because then the \r is a literal backslash-r
62 63 # instead of a carriage return.
63 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
64 65 result = []
65 66
66 67 n = 0
67 68 for i in xrange(0, len(value), valuelen):
68 69 n += 1
69 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
70 71
71 72 return result
72 73
73 74 def _wraphttpresponse(resp):
74 75 """Wrap an HTTPResponse with common error handlers.
75 76
76 77 This ensures that any I/O from any consumer raises the appropriate
77 78 error and messaging.
78 79 """
79 80 origread = resp.read
80 81
81 82 class readerproxy(resp.__class__):
82 83 def read(self, size=None):
83 84 try:
84 85 return origread(size)
85 86 except httplib.IncompleteRead as e:
86 87 # e.expected is an integer if length known or None otherwise.
87 88 if e.expected:
88 89 msg = _('HTTP request error (incomplete response; '
89 90 'expected %d bytes got %d)') % (e.expected,
90 91 len(e.partial))
91 92 else:
92 93 msg = _('HTTP request error (incomplete response)')
93 94
94 95 raise error.PeerTransportError(
95 96 msg,
96 97 hint=_('this may be an intermittent network failure; '
97 98 'if the error persists, consider contacting the '
98 99 'network or server operator'))
99 100 except httplib.HTTPException as e:
100 101 raise error.PeerTransportError(
101 102 _('HTTP request error (%s)') % e,
102 103 hint=_('this may be an intermittent network failure; '
103 104 'if the error persists, consider contacting the '
104 105 'network or server operator'))
105 106
106 107 resp.__class__ = readerproxy
107 108
108 109 class _multifile(object):
109 110 def __init__(self, *fileobjs):
110 111 for f in fileobjs:
111 112 if not util.safehasattr(f, 'length'):
112 113 raise ValueError(
113 114 '_multifile only supports file objects that '
114 115 'have a length but this one does not:', type(f), f)
115 116 self._fileobjs = fileobjs
116 117 self._index = 0
117 118
118 119 @property
119 120 def length(self):
120 121 return sum(f.length for f in self._fileobjs)
121 122
122 123 def read(self, amt=None):
123 124 if amt <= 0:
124 125 return ''.join(f.read() for f in self._fileobjs)
125 126 parts = []
126 127 while amt and self._index < len(self._fileobjs):
127 128 parts.append(self._fileobjs[self._index].read(amt))
128 129 got = len(parts[-1])
129 130 if got < amt:
130 131 self._index += 1
131 132 amt -= got
132 133 return ''.join(parts)
133 134
134 135 def seek(self, offset, whence=os.SEEK_SET):
135 136 if whence != os.SEEK_SET:
136 137 raise NotImplementedError(
137 138 '_multifile does not support anything other'
138 139 ' than os.SEEK_SET for whence on seek()')
139 140 if offset != 0:
140 141 raise NotImplementedError(
141 142 '_multifile only supports seeking to start, but that '
142 143 'could be fixed if you need it')
143 144 for f in self._fileobjs:
144 145 f.seek(0)
145 146 self._index = 0
146 147
147 148 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 149 repobaseurl, cmd, args):
149 150 """Make an HTTP request to run a command for a version 1 client.
150 151
151 152 ``caps`` is a set of known server capabilities. The value may be
152 153 None if capabilities are not yet known.
153 154
154 155 ``capablefn`` is a function to evaluate a capability.
155 156
156 157 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 158 raw data to pass to it.
158 159 """
159 160 if cmd == 'pushkey':
160 161 args['data'] = ''
161 162 data = args.pop('data', None)
162 163 headers = args.pop('headers', {})
163 164
164 165 ui.debug("sending %s command\n" % cmd)
165 166 q = [('cmd', cmd)]
166 167 headersize = 0
167 168 # Important: don't use self.capable() here or else you end up
168 169 # with infinite recursion when trying to look up capabilities
169 170 # for the first time.
170 171 postargsok = caps is not None and 'httppostargs' in caps
171 172
172 173 # Send arguments via POST.
173 174 if postargsok and args:
174 175 strargs = urlreq.urlencode(sorted(args.items()))
175 176 if not data:
176 177 data = strargs
177 178 else:
178 179 if isinstance(data, bytes):
179 180 i = io.BytesIO(data)
180 181 i.length = len(data)
181 182 data = i
182 183 argsio = io.BytesIO(strargs)
183 184 argsio.length = len(strargs)
184 185 data = _multifile(argsio, data)
185 186 headers[r'X-HgArgs-Post'] = len(strargs)
186 187 elif args:
187 188 # Calling self.capable() can infinite loop if we are calling
188 189 # "capabilities". But that command should never accept wire
189 190 # protocol arguments. So this should never happen.
190 191 assert cmd != 'capabilities'
191 192 httpheader = capablefn('httpheader')
192 193 if httpheader:
193 194 headersize = int(httpheader.split(',', 1)[0])
194 195
195 196 # Send arguments via HTTP headers.
196 197 if headersize > 0:
197 198 # The headers can typically carry more data than the URL.
198 199 encargs = urlreq.urlencode(sorted(args.items()))
199 200 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 201 headersize):
201 202 headers[header] = value
202 203 # Send arguments via query string (Mercurial <1.9).
203 204 else:
204 205 q += sorted(args.items())
205 206
206 207 qs = '?%s' % urlreq.urlencode(q)
207 208 cu = "%s%s" % (repobaseurl, qs)
208 209 size = 0
209 210 if util.safehasattr(data, 'length'):
210 211 size = data.length
211 212 elif data is not None:
212 213 size = len(data)
213 214 if data is not None and r'Content-Type' not in headers:
214 215 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 216
216 217 # Tell the server we accept application/mercurial-0.2 and multiple
217 218 # compression formats if the server is capable of emitting those
218 219 # payloads.
219 220 # Note: Keep this set empty by default, as client advertisement of
220 221 # protocol parameters should only occur after the handshake.
221 222 protoparams = set()
222 223
223 224 mediatypes = set()
224 225 if caps is not None:
225 226 mt = capablefn('httpmediatype')
226 227 if mt:
227 228 protoparams.add('0.1')
228 229 mediatypes = set(mt.split(','))
229 230
230 231 protoparams.add('partial-pull')
231 232
232 233 if '0.2tx' in mediatypes:
233 234 protoparams.add('0.2')
234 235
235 236 if '0.2tx' in mediatypes and capablefn('compression'):
236 237 # We /could/ compare supported compression formats and prune
237 238 # non-mutually supported or error if nothing is mutually supported.
238 239 # For now, send the full list to the server and have it error.
239 240 comps = [e.wireprotosupport().name for e in
240 241 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 242 protoparams.add('comp=%s' % ','.join(comps))
242 243
243 244 if protoparams:
244 245 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 246 'X-HgProto',
246 247 headersize or 1024)
247 248 for header, value in protoheaders:
248 249 headers[header] = value
249 250
250 251 varyheaders = []
251 252 for header in headers:
252 253 if header.lower().startswith(r'x-hg'):
253 254 varyheaders.append(header)
254 255
255 256 if varyheaders:
256 257 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 258
258 259 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 260
260 261 if data is not None:
261 262 ui.debug("sending %d bytes\n" % size)
262 263 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 264
264 265 return req, cu, qs
265 266
266 267 def sendrequest(ui, opener, req):
267 268 """Send a prepared HTTP request.
268 269
269 270 Returns the response object.
270 271 """
271 272 if (ui.debugflag
272 273 and ui.configbool('devel', 'debug.peer-request')):
273 274 dbg = ui.debug
274 275 line = 'devel-peer-request: %s\n'
275 276 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
276 277 hgargssize = None
277 278
278 279 for header, value in sorted(req.header_items()):
279 280 if header.startswith('X-hgarg-'):
280 281 if hgargssize is None:
281 282 hgargssize = 0
282 283 hgargssize += len(value)
283 284 else:
284 285 dbg(line % ' %s %s' % (header, value))
285 286
286 287 if hgargssize is not None:
287 288 dbg(line % ' %d bytes of commands arguments in headers'
288 289 % hgargssize)
289 290
290 291 if req.has_data():
291 292 data = req.get_data()
292 293 length = getattr(data, 'length', None)
293 294 if length is None:
294 295 length = len(data)
295 296 dbg(line % ' %d bytes of data' % length)
296 297
297 298 start = util.timer()
298 299
299 300 try:
300 301 res = opener.open(req)
301 302 except urlerr.httperror as inst:
302 303 if inst.code == 401:
303 304 raise error.Abort(_('authorization failed'))
304 305 raise
305 306 except httplib.HTTPException as inst:
306 307 ui.debug('http error requesting %s\n' %
307 308 util.hidepassword(req.get_full_url()))
308 309 ui.traceback()
309 310 raise IOError(None, inst)
310 311 finally:
311 312 if ui.configbool('devel', 'debug.peer-request'):
312 313 dbg(line % ' finished in %.4f seconds (%s)'
313 314 % (util.timer() - start, res.code))
314 315
315 316 # Insert error handlers for common I/O failures.
316 317 _wraphttpresponse(res)
317 318
318 319 return res
319 320
320 321 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
321 322 allowcbor=False):
322 323 # record the url we got redirected to
323 324 respurl = pycompat.bytesurl(resp.geturl())
324 325 if respurl.endswith(qs):
325 326 respurl = respurl[:-len(qs)]
326 327 if baseurl.rstrip('/') != respurl.rstrip('/'):
327 328 if not ui.quiet:
328 329 ui.warn(_('real URL is %s\n') % respurl)
329 330
330 331 try:
331 332 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
332 333 except AttributeError:
333 334 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
334 335
335 336 safeurl = util.hidepassword(baseurl)
336 337 if proto.startswith('application/hg-error'):
337 338 raise error.OutOfBandError(resp.read())
338 339
339 340 # Pre 1.0 versions of Mercurial used text/plain and
340 341 # application/hg-changegroup. We don't support such old servers.
341 342 if not proto.startswith('application/mercurial-'):
342 343 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
343 344 raise error.RepoError(
344 345 _("'%s' does not appear to be an hg repository:\n"
345 346 "---%%<--- (%s)\n%s\n---%%<---\n")
346 347 % (safeurl, proto or 'no content-type', resp.read(1024)))
347 348
348 349 try:
349 350 subtype = proto.split('-', 1)[1]
350 351
351 352 # Unless we end up supporting CBOR in the legacy wire protocol,
352 353 # this should ONLY be encountered for the initial capabilities
353 354 # request during handshake.
354 355 if subtype == 'cbor':
355 356 if allowcbor:
356 357 return respurl, proto, resp
357 358 else:
358 359 raise error.RepoError(_('unexpected CBOR response from '
359 360 'server'))
360 361
361 362 version_info = tuple([int(n) for n in subtype.split('.')])
362 363 except ValueError:
363 364 raise error.RepoError(_("'%s' sent a broken Content-Type "
364 365 "header (%s)") % (safeurl, proto))
365 366
366 367 # TODO consider switching to a decompression reader that uses
367 368 # generators.
368 369 if version_info == (0, 1):
369 370 if compressible:
370 371 resp = util.compengines['zlib'].decompressorreader(resp)
371 372
372 373 elif version_info == (0, 2):
373 374 # application/mercurial-0.2 always identifies the compression
374 375 # engine in the payload header.
375 376 elen = struct.unpack('B', resp.read(1))[0]
376 377 ename = resp.read(elen)
377 378 engine = util.compengines.forwiretype(ename)
378 379
379 380 resp = engine.decompressorreader(resp)
380 381 else:
381 382 raise error.RepoError(_("'%s' uses newer protocol %s") %
382 383 (safeurl, subtype))
383 384
384 385 return respurl, proto, resp
385 386
386 387 class httppeer(wireprotov1peer.wirepeer):
387 388 def __init__(self, ui, path, url, opener, requestbuilder, caps):
388 389 self.ui = ui
389 390 self._path = path
390 391 self._url = url
391 392 self._caps = caps
392 393 self._urlopener = opener
393 394 self._requestbuilder = requestbuilder
394 395
395 396 def __del__(self):
396 397 for h in self._urlopener.handlers:
397 398 h.close()
398 399 getattr(h, "close_all", lambda: None)()
399 400
400 401 # Begin of ipeerconnection interface.
401 402
402 403 def url(self):
403 404 return self._path
404 405
405 406 def local(self):
406 407 return None
407 408
408 409 def peer(self):
409 410 return self
410 411
411 412 def canpush(self):
412 413 return True
413 414
414 415 def close(self):
415 416 pass
416 417
417 418 # End of ipeerconnection interface.
418 419
419 420 # Begin of ipeercommands interface.
420 421
421 422 def capabilities(self):
422 423 return self._caps
423 424
424 425 # End of ipeercommands interface.
425 426
426 427 # look up capabilities only when needed
427 428
428 429 def _callstream(self, cmd, _compressible=False, **args):
429 430 args = pycompat.byteskwargs(args)
430 431
431 432 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
432 433 self._caps, self.capable,
433 434 self._url, cmd, args)
434 435
435 436 resp = sendrequest(self.ui, self._urlopener, req)
436 437
437 438 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
438 439 resp, _compressible)
439 440
440 441 return resp
441 442
442 443 def _call(self, cmd, **args):
443 444 fp = self._callstream(cmd, **args)
444 445 try:
445 446 return fp.read()
446 447 finally:
447 448 # if using keepalive, allow connection to be reused
448 449 fp.close()
449 450
450 451 def _callpush(self, cmd, cg, **args):
451 452 # have to stream bundle to a temp file because we do not have
452 453 # http 1.1 chunked transfer.
453 454
454 455 types = self.capable('unbundle')
455 456 try:
456 457 types = types.split(',')
457 458 except AttributeError:
458 459 # servers older than d1b16a746db6 will send 'unbundle' as a
459 460 # boolean capability. They only support headerless/uncompressed
460 461 # bundles.
461 462 types = [""]
462 463 for x in types:
463 464 if x in bundle2.bundletypes:
464 465 type = x
465 466 break
466 467
467 468 tempname = bundle2.writebundle(self.ui, cg, None, type)
468 469 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
469 470 headers = {r'Content-Type': r'application/mercurial-0.1'}
470 471
471 472 try:
472 473 r = self._call(cmd, data=fp, headers=headers, **args)
473 474 vals = r.split('\n', 1)
474 475 if len(vals) < 2:
475 476 raise error.ResponseError(_("unexpected response:"), r)
476 477 return vals
477 478 except urlerr.httperror:
478 479 # Catch and re-raise these so we don't try and treat them
479 480 # like generic socket errors. They lack any values in
480 481 # .args on Python 3 which breaks our socket.error block.
481 482 raise
482 483 except socket.error as err:
483 484 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
484 485 raise error.Abort(_('push failed: %s') % err.args[1])
485 486 raise error.Abort(err.args[1])
486 487 finally:
487 488 fp.close()
488 489 os.unlink(tempname)
489 490
490 491 def _calltwowaystream(self, cmd, fp, **args):
491 492 fh = None
492 493 fp_ = None
493 494 filename = None
494 495 try:
495 496 # dump bundle to disk
496 497 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
497 498 fh = os.fdopen(fd, r"wb")
498 499 d = fp.read(4096)
499 500 while d:
500 501 fh.write(d)
501 502 d = fp.read(4096)
502 503 fh.close()
503 504 # start http push
504 505 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
505 506 headers = {r'Content-Type': r'application/mercurial-0.1'}
506 507 return self._callstream(cmd, data=fp_, headers=headers, **args)
507 508 finally:
508 509 if fp_ is not None:
509 510 fp_.close()
510 511 if fh is not None:
511 512 fh.close()
512 513 os.unlink(filename)
513 514
514 515 def _callcompressable(self, cmd, **args):
515 516 return self._callstream(cmd, _compressible=True, **args)
516 517
517 518 def _abort(self, exception):
518 519 raise exception
519 520
521 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 buffersends=True)
524
525 url = '%s/%s' % (apiurl, permission)
526
527 if len(requests) > 1:
528 url += '/multirequest'
529 else:
530 url += '/%s' % requests[0][0]
531
532 # Request ID to (request, future)
533 requestmap = {}
534
535 for command, args, f in requests:
536 request, action, meta = reactor.callcommand(command, args)
537 assert action == 'noop'
538
539 requestmap[request.requestid] = (request, f)
540
541 action, meta = reactor.flushcommands()
542 assert action == 'sendframes'
543
544 # TODO stream this.
545 body = b''.join(map(bytes, meta['framegen']))
546
547 # TODO modify user-agent to reflect v2
548 headers = {
549 r'Accept': wireprotov2server.FRAMINGTYPE,
550 r'Content-Type': wireprotov2server.FRAMINGTYPE,
551 }
552
553 req = requestbuilder(pycompat.strurl(url), body, headers)
554 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
555
556 try:
557 res = opener.open(req)
558 except urlerr.httperror as e:
559 if e.code == 401:
560 raise error.Abort(_('authorization failed'))
561
562 raise
563 except httplib.HTTPException as e:
564 ui.traceback()
565 raise IOError(None, e)
566
567 return reactor, requestmap, res
568
569 class queuedcommandfuture(pycompat.futures.Future):
570 """Wraps result() on command futures to trigger submission on call."""
571
572 def result(self, timeout=None):
573 if self.done():
574 return pycompat.futures.Future.result(self, timeout)
575
576 self._peerexecutor.sendcommands()
577
578 # sendcommands() will restore the original __class__ and self.result
579 # will resolve to Future.result.
580 return self.result(timeout)
581
582 @zi.implementer(repository.ipeercommandexecutor)
583 class httpv2executor(object):
584 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
585 self._ui = ui
586 self._opener = opener
587 self._requestbuilder = requestbuilder
588 self._apiurl = apiurl
589 self._descriptor = descriptor
590 self._sent = False
591 self._closed = False
592 self._neededpermissions = set()
593 self._calls = []
594 self._futures = weakref.WeakSet()
595 self._responseexecutor = None
596 self._responsef = None
597
598 def __enter__(self):
599 return self
600
601 def __exit__(self, exctype, excvalue, exctb):
602 self.close()
603
604 def callcommand(self, command, args):
605 if self._sent:
606 raise error.ProgrammingError('callcommand() cannot be used after '
607 'commands are sent')
608
609 if self._closed:
610 raise error.ProgrammingError('callcommand() cannot be used after '
611 'close()')
612
613 # The service advertises which commands are available. So if we attempt
614 # to call an unknown command or pass an unknown argument, we can screen
615 # for this.
616 if command not in self._descriptor['commands']:
617 raise error.ProgrammingError(
618 'wire protocol command %s is not available' % command)
619
620 cmdinfo = self._descriptor['commands'][command]
621 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
622
623 if unknownargs:
624 raise error.ProgrammingError(
625 'wire protocol command %s does not accept argument: %s' % (
626 command, ', '.join(sorted(unknownargs))))
627
628 self._neededpermissions |= set(cmdinfo['permissions'])
629
630 # TODO we /could/ also validate types here, since the API descriptor
631 # includes types...
632
633 f = pycompat.futures.Future()
634
635 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
636 # could deadlock.
637 f.__class__ = queuedcommandfuture
638 f._peerexecutor = self
639
640 self._futures.add(f)
641 self._calls.append((command, args, f))
642
643 return f
644
645 def sendcommands(self):
646 if self._sent:
647 return
648
649 if not self._calls:
650 return
651
652 self._sent = True
653
654 # Unhack any future types so caller sees a clean type and so we
655 # break reference cycle.
656 for f in self._futures:
657 if isinstance(f, queuedcommandfuture):
658 f.__class__ = pycompat.futures.Future
659 f._peerexecutor = None
660
661 # Mark the future as running and filter out cancelled futures.
662 calls = [(command, args, f)
663 for command, args, f in self._calls
664 if f.set_running_or_notify_cancel()]
665
666 # Clear out references, prevent improper object usage.
667 self._calls = None
668
669 if not calls:
670 return
671
672 permissions = set(self._neededpermissions)
673
674 if 'push' in permissions and 'pull' in permissions:
675 permissions.remove('pull')
676
677 if len(permissions) > 1:
678 raise error.RepoError(_('cannot make request requiring multiple '
679 'permissions: %s') %
680 _(', ').join(sorted(permissions)))
681
682 permission = {
683 'push': 'rw',
684 'pull': 'ro',
685 }[permissions.pop()]
686
687 reactor, requests, resp = sendv2request(
688 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 permission, calls)
690
691 # TODO we probably want to validate the HTTP code, media type, etc.
692
693 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 reactor,
696 requests,
697 resp)
698
699 def close(self):
700 if self._closed:
701 return
702
703 self.sendcommands()
704
705 self._closed = True
706
707 if not self._responsef:
708 return
709
710 try:
711 self._responsef.result()
712 finally:
713 self._responseexecutor.shutdown(wait=True)
714 self._responsef = None
715 self._responseexecutor = None
716
717 # If any of our futures are still in progress, mark them as
718 # errored, otherwise a result() could wait indefinitely.
719 for f in self._futures:
720 if not f.done():
721 f.set_exception(error.ResponseError(
722 _('unfulfilled command response')))
723
724 self._futures = None
725
726 def _handleresponse(self, reactor, requests, resp):
727 # Called in a thread to read the response.
728
729 results = {k: [] for k in requests}
730
731 while True:
732 frame = wireprotoframing.readframe(resp)
733 if frame is None:
734 break
735
736 self._ui.note(_('received %r\n') % frame)
737
738 # Guard against receiving a frame with a request ID that we
739 # didn't issue. This should never happen.
740 request, f = requests.get(frame.requestid, [None, None])
741
742 action, meta = reactor.onframerecv(frame)
743
744 if action == 'responsedata':
745 assert request.requestid == meta['request'].requestid
746
747 result = results[request.requestid]
748
749 if meta['cbor']:
750 payload = util.bytesio(meta['data'])
751
752 decoder = cbor.CBORDecoder(payload)
753 while payload.tell() + 1 < len(meta['data']):
754 try:
755 result.append(decoder.decode())
756 except Exception:
757 f.set_exception_info(*sys.exc_info()[1:])
758 continue
759 else:
760 result.append(meta['data'])
761
762 if meta['eos']:
763 f.set_result(result)
764 del results[request.requestid]
765
766 else:
767 e = error.ProgrammingError('unhandled action: %s' % action)
768
769 if f:
770 f.set_exception(e)
771 else:
772 raise e
773
520 774 # TODO implement interface for version 2 peers
521 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
775 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
776 repository.ipeerrequests)
522 777 class httpv2peer(object):
523 778 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
524 779 apidescriptor):
525 780 self.ui = ui
526 781
527 782 if repourl.endswith('/'):
528 783 repourl = repourl[:-1]
529 784
530 785 self._url = repourl
531 786 self._apipath = apipath
787 self._apiurl = '%s/%s' % (repourl, apipath)
532 788 self._opener = opener
533 789 self._requestbuilder = requestbuilder
534 790 self._descriptor = apidescriptor
535 791
536 792 # Start of ipeerconnection.
537 793
538 794 def url(self):
539 795 return self._url
540 796
541 797 def local(self):
542 798 return None
543 799
544 800 def peer(self):
545 801 return self
546 802
547 803 def canpush(self):
548 804 # TODO change once implemented.
549 805 return False
550 806
551 807 def close(self):
552 808 pass
553 809
554 810 # End of ipeerconnection.
555 811
556 812 # Start of ipeercapabilities.
557 813
558 814 def capable(self, name):
559 815 # The capabilities used internally historically map to capabilities
560 816 # advertised from the "capabilities" wire protocol command. However,
561 817 # version 2 of that command works differently.
562 818
563 819 # Maps to commands that are available.
564 820 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
565 821 return True
566 822
567 823 # Other concepts.
568 824 if name in ('bundle2',):
569 825 return True
570 826
571 827 return False
572 828
573 829 def requirecap(self, name, purpose):
574 830 if self.capable(name):
575 831 return
576 832
577 833 raise error.CapabilityError(
578 834 _('cannot %s; client or remote repository does not support the %r '
579 835 'capability') % (purpose, name))
580 836
581 837 # End of ipeercapabilities.
582 838
583 # TODO require to be part of a batched primitive, use futures.
584 839 def _call(self, name, **args):
585 """Call a wire protocol command with arguments."""
586
587 # Having this early has a side-effect of importing wireprotov2server,
588 # which has the side-effect of ensuring commands are registered.
589
590 # TODO modify user-agent to reflect v2.
591 headers = {
592 r'Accept': wireprotov2server.FRAMINGTYPE,
593 r'Content-Type': wireprotov2server.FRAMINGTYPE,
594 }
595
596 # TODO permissions should come from capabilities results.
597 permission = wireproto.commandsv2[name].permission
598 if permission not in ('push', 'pull'):
599 raise error.ProgrammingError('unknown permission type: %s' %
600 permission)
601
602 permission = {
603 'push': 'rw',
604 'pull': 'ro',
605 }[permission]
606
607 url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
608
609 # TODO this should be part of a generic peer for the frame-based
610 # protocol.
611 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
612 buffersends=True)
613
614 request, action, meta = reactor.callcommand(name, args)
615 assert action == 'noop'
616
617 action, meta = reactor.flushcommands()
618 assert action == 'sendframes'
840 with self.commandexecutor() as e:
841 return e.callcommand(name, args).result()
619 842
620 body = b''.join(map(bytes, meta['framegen']))
621 req = self._requestbuilder(pycompat.strurl(url), body, headers)
622 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
623
624 # TODO unify this code with httppeer.
625 try:
626 res = self._opener.open(req)
627 except urlerr.httperror as e:
628 if e.code == 401:
629 raise error.Abort(_('authorization failed'))
630
631 raise
632 except httplib.HTTPException as e:
633 self.ui.traceback()
634 raise IOError(None, e)
635
636 # TODO validate response type, wrap response to handle I/O errors.
637 # TODO more robust frame receiver.
638 results = []
639
640 while True:
641 frame = wireprotoframing.readframe(res)
642 if frame is None:
643 break
644
645 self.ui.note(_('received %r\n') % frame)
646
647 action, meta = reactor.onframerecv(frame)
648
649 if action == 'responsedata':
650 if meta['cbor']:
651 payload = util.bytesio(meta['data'])
652
653 decoder = cbor.CBORDecoder(payload)
654 while payload.tell() + 1 < len(meta['data']):
655 results.append(decoder.decode())
656 else:
657 results.append(meta['data'])
658 else:
659 error.ProgrammingError('unhandled action: %s' % action)
660
661 return results
843 def commandexecutor(self):
844 return httpv2executor(self.ui, self._opener, self._requestbuilder,
845 self._apiurl, self._descriptor)
662 846
663 847 # Registry of API service names to metadata about peers that handle it.
664 848 #
665 849 # The following keys are meaningful:
666 850 #
667 851 # init
668 852 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
669 853 # apidescriptor) to create a peer.
670 854 #
671 855 # priority
672 856 # Integer priority for the service. If we could choose from multiple
673 857 # services, we choose the one with the highest priority.
674 858 API_PEERS = {
675 859 wireprototypes.HTTP_WIREPROTO_V2: {
676 860 'init': httpv2peer,
677 861 'priority': 50,
678 862 },
679 863 }
680 864
681 865 def performhandshake(ui, url, opener, requestbuilder):
682 866 # The handshake is a request to the capabilities command.
683 867
684 868 caps = None
685 869 def capable(x):
686 870 raise error.ProgrammingError('should not be called')
687 871
688 872 args = {}
689 873
690 874 # The client advertises support for newer protocols by adding an
691 875 # X-HgUpgrade-* header with a list of supported APIs and an
692 876 # X-HgProto-* header advertising which serializing formats it supports.
693 877 # We only support the HTTP version 2 transport and CBOR responses for
694 878 # now.
695 879 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
696 880
697 881 if advertisev2:
698 882 args['headers'] = {
699 883 r'X-HgProto-1': r'cbor',
700 884 }
701 885
702 886 args['headers'].update(
703 887 encodevalueinheaders(' '.join(sorted(API_PEERS)),
704 888 'X-HgUpgrade',
705 889 # We don't know the header limit this early.
706 890 # So make it small.
707 891 1024))
708 892
709 893 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
710 894 capable, url, 'capabilities',
711 895 args)
712 896
713 897 resp = sendrequest(ui, opener, req)
714 898
715 899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
716 900 compressible=False,
717 901 allowcbor=advertisev2)
718 902
719 903 try:
720 904 rawdata = resp.read()
721 905 finally:
722 906 resp.close()
723 907
724 908 if not ct.startswith('application/mercurial-'):
725 909 raise error.ProgrammingError('unexpected content-type: %s' % ct)
726 910
727 911 if advertisev2:
728 912 if ct == 'application/mercurial-cbor':
729 913 try:
730 914 info = cbor.loads(rawdata)
731 915 except cbor.CBORDecodeError:
732 916 raise error.Abort(_('error decoding CBOR from remote server'),
733 917 hint=_('try again and consider contacting '
734 918 'the server operator'))
735 919
736 920 # We got a legacy response. That's fine.
737 921 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
738 922 info = {
739 923 'v1capabilities': set(rawdata.split())
740 924 }
741 925
742 926 else:
743 927 raise error.RepoError(
744 928 _('unexpected response type from server: %s') % ct)
745 929 else:
746 930 info = {
747 931 'v1capabilities': set(rawdata.split())
748 932 }
749 933
750 934 return respurl, info
751 935
752 936 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
753 937 """Construct an appropriate HTTP peer instance.
754 938
755 939 ``opener`` is an ``url.opener`` that should be used to establish
756 940 connections, perform HTTP requests.
757 941
758 942 ``requestbuilder`` is the type used for constructing HTTP requests.
759 943 It exists as an argument so extensions can override the default.
760 944 """
761 945 u = util.url(path)
762 946 if u.query or u.fragment:
763 947 raise error.Abort(_('unsupported URL component: "%s"') %
764 948 (u.query or u.fragment))
765 949
766 950 # urllib cannot handle URLs with embedded user or passwd.
767 951 url, authinfo = u.authinfo()
768 952 ui.debug('using %s\n' % url)
769 953
770 954 opener = opener or urlmod.opener(ui, authinfo)
771 955
772 956 respurl, info = performhandshake(ui, url, opener, requestbuilder)
773 957
774 958 # Given the intersection of APIs that both we and the server support,
775 959 # sort by their advertised priority and pick the first one.
776 960 #
777 961 # TODO consider making this request-based and interface driven. For
778 962 # example, the caller could say "I want a peer that does X." It's quite
779 963 # possible that not all peers would do that. Since we know the service
780 964 # capabilities, we could filter out services not meeting the
781 965 # requirements. Possibly by consulting the interfaces defined by the
782 966 # peer type.
783 967 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
784 968
785 969 preferredchoices = sorted(apipeerchoices,
786 970 key=lambda x: API_PEERS[x]['priority'],
787 971 reverse=True)
788 972
789 973 for service in preferredchoices:
790 974 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
791 975
792 976 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
793 977 requestbuilder,
794 978 info['apis'][service])
795 979
796 980 # Failed to construct an API peer. Fall back to legacy.
797 981 return httppeer(ui, path, respurl, opener, requestbuilder,
798 982 info['v1capabilities'])
799 983
800 984 def instance(ui, path, create):
801 985 if create:
802 986 raise error.Abort(_('cannot create new http repository'))
803 987 try:
804 988 if path.startswith('https:') and not urlmod.has_https:
805 989 raise error.Abort(_('Python support for SSL and HTTPS '
806 990 'is not installed'))
807 991
808 992 inst = makepeer(ui, path)
809 993
810 994 return inst
811 995 except error.RepoError as httpexception:
812 996 try:
813 997 r = statichttprepo.instance(ui, "static-" + path, create)
814 998 ui.note(_('(falling back to static-http)\n'))
815 999 return r
816 1000 except error.RepoError:
817 1001 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now