##// END OF EJS Templates
httppeer: use context manager when writing temporary bundle to send...
Martin von Zweigbergk -
r43118:58f73e9c default
parent child Browse files
Show More
@@ -1,1012 +1,1009 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 statichttprepo,
25 25 url as urlmod,
26 26 util,
27 27 wireprotoframing,
28 28 wireprototypes,
29 29 wireprotov1peer,
30 30 wireprotov2peer,
31 31 wireprotov2server,
32 32 )
33 33 from .interfaces import (
34 34 repository,
35 35 util as interfaceutil,
36 36 )
37 37 from .utils import (
38 38 cborutil,
39 39 stringutil,
40 40 )
41 41
42 42 httplib = util.httplib
43 43 urlerr = util.urlerr
44 44 urlreq = util.urlreq
45 45
46 46 def encodevalueinheaders(value, header, limit):
47 47 """Encode a string value into multiple HTTP headers.
48 48
49 49 ``value`` will be encoded into 1 or more HTTP headers with the names
50 50 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
51 51 name + value will be at most ``limit`` bytes long.
52 52
53 53 Returns an iterable of 2-tuples consisting of header names and
54 54 values as native strings.
55 55 """
56 56 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
57 57 # not bytes. This function always takes bytes in as arguments.
58 58 fmt = pycompat.strurl(header) + r'-%s'
59 59 # Note: it is *NOT* a bug that the last bit here is a bytestring
60 60 # and not a unicode: we're just getting the encoded length anyway,
61 61 # and using an r-string to make it portable between Python 2 and 3
62 62 # doesn't work because then the \r is a literal backslash-r
63 63 # instead of a carriage return.
64 64 valuelen = limit - len(fmt % r'000') - len(': \r\n')
65 65 result = []
66 66
67 67 n = 0
68 68 for i in pycompat.xrange(0, len(value), valuelen):
69 69 n += 1
70 70 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
71 71
72 72 return result
73 73
74 74 class _multifile(object):
75 75 def __init__(self, *fileobjs):
76 76 for f in fileobjs:
77 77 if not util.safehasattr(f, 'length'):
78 78 raise ValueError(
79 79 '_multifile only supports file objects that '
80 80 'have a length but this one does not:', type(f), f)
81 81 self._fileobjs = fileobjs
82 82 self._index = 0
83 83
84 84 @property
85 85 def length(self):
86 86 return sum(f.length for f in self._fileobjs)
87 87
88 88 def read(self, amt=None):
89 89 if amt <= 0:
90 90 return ''.join(f.read() for f in self._fileobjs)
91 91 parts = []
92 92 while amt and self._index < len(self._fileobjs):
93 93 parts.append(self._fileobjs[self._index].read(amt))
94 94 got = len(parts[-1])
95 95 if got < amt:
96 96 self._index += 1
97 97 amt -= got
98 98 return ''.join(parts)
99 99
100 100 def seek(self, offset, whence=os.SEEK_SET):
101 101 if whence != os.SEEK_SET:
102 102 raise NotImplementedError(
103 103 '_multifile does not support anything other'
104 104 ' than os.SEEK_SET for whence on seek()')
105 105 if offset != 0:
106 106 raise NotImplementedError(
107 107 '_multifile only supports seeking to start, but that '
108 108 'could be fixed if you need it')
109 109 for f in self._fileobjs:
110 110 f.seek(0)
111 111 self._index = 0
112 112
113 113 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
114 114 repobaseurl, cmd, args):
115 115 """Make an HTTP request to run a command for a version 1 client.
116 116
117 117 ``caps`` is a set of known server capabilities. The value may be
118 118 None if capabilities are not yet known.
119 119
120 120 ``capablefn`` is a function to evaluate a capability.
121 121
122 122 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
123 123 raw data to pass to it.
124 124 """
125 125 if cmd == 'pushkey':
126 126 args['data'] = ''
127 127 data = args.pop('data', None)
128 128 headers = args.pop('headers', {})
129 129
130 130 ui.debug("sending %s command\n" % cmd)
131 131 q = [('cmd', cmd)]
132 132 headersize = 0
133 133 # Important: don't use self.capable() here or else you end up
134 134 # with infinite recursion when trying to look up capabilities
135 135 # for the first time.
136 136 postargsok = caps is not None and 'httppostargs' in caps
137 137
138 138 # Send arguments via POST.
139 139 if postargsok and args:
140 140 strargs = urlreq.urlencode(sorted(args.items()))
141 141 if not data:
142 142 data = strargs
143 143 else:
144 144 if isinstance(data, bytes):
145 145 i = io.BytesIO(data)
146 146 i.length = len(data)
147 147 data = i
148 148 argsio = io.BytesIO(strargs)
149 149 argsio.length = len(strargs)
150 150 data = _multifile(argsio, data)
151 151 headers[r'X-HgArgs-Post'] = len(strargs)
152 152 elif args:
153 153 # Calling self.capable() can infinite loop if we are calling
154 154 # "capabilities". But that command should never accept wire
155 155 # protocol arguments. So this should never happen.
156 156 assert cmd != 'capabilities'
157 157 httpheader = capablefn('httpheader')
158 158 if httpheader:
159 159 headersize = int(httpheader.split(',', 1)[0])
160 160
161 161 # Send arguments via HTTP headers.
162 162 if headersize > 0:
163 163 # The headers can typically carry more data than the URL.
164 164 encargs = urlreq.urlencode(sorted(args.items()))
165 165 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
166 166 headersize):
167 167 headers[header] = value
168 168 # Send arguments via query string (Mercurial <1.9).
169 169 else:
170 170 q += sorted(args.items())
171 171
172 172 qs = '?%s' % urlreq.urlencode(q)
173 173 cu = "%s%s" % (repobaseurl, qs)
174 174 size = 0
175 175 if util.safehasattr(data, 'length'):
176 176 size = data.length
177 177 elif data is not None:
178 178 size = len(data)
179 179 if data is not None and r'Content-Type' not in headers:
180 180 headers[r'Content-Type'] = r'application/mercurial-0.1'
181 181
182 182 # Tell the server we accept application/mercurial-0.2 and multiple
183 183 # compression formats if the server is capable of emitting those
184 184 # payloads.
185 185 # Note: Keep this set empty by default, as client advertisement of
186 186 # protocol parameters should only occur after the handshake.
187 187 protoparams = set()
188 188
189 189 mediatypes = set()
190 190 if caps is not None:
191 191 mt = capablefn('httpmediatype')
192 192 if mt:
193 193 protoparams.add('0.1')
194 194 mediatypes = set(mt.split(','))
195 195
196 196 protoparams.add('partial-pull')
197 197
198 198 if '0.2tx' in mediatypes:
199 199 protoparams.add('0.2')
200 200
201 201 if '0.2tx' in mediatypes and capablefn('compression'):
202 202 # We /could/ compare supported compression formats and prune
203 203 # non-mutually supported or error if nothing is mutually supported.
204 204 # For now, send the full list to the server and have it error.
205 205 comps = [e.wireprotosupport().name for e in
206 206 util.compengines.supportedwireengines(util.CLIENTROLE)]
207 207 protoparams.add('comp=%s' % ','.join(comps))
208 208
209 209 if protoparams:
210 210 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
211 211 'X-HgProto',
212 212 headersize or 1024)
213 213 for header, value in protoheaders:
214 214 headers[header] = value
215 215
216 216 varyheaders = []
217 217 for header in headers:
218 218 if header.lower().startswith(r'x-hg'):
219 219 varyheaders.append(header)
220 220
221 221 if varyheaders:
222 222 headers[r'Vary'] = r','.join(sorted(varyheaders))
223 223
224 224 req = requestbuilder(pycompat.strurl(cu), data, headers)
225 225
226 226 if data is not None:
227 227 ui.debug("sending %d bytes\n" % size)
228 228 req.add_unredirected_header(r'Content-Length', r'%d' % size)
229 229
230 230 return req, cu, qs
231 231
232 232 def _reqdata(req):
233 233 """Get request data, if any. If no data, returns None."""
234 234 if pycompat.ispy3:
235 235 return req.data
236 236 if not req.has_data():
237 237 return None
238 238 return req.get_data()
239 239
240 240 def sendrequest(ui, opener, req):
241 241 """Send a prepared HTTP request.
242 242
243 243 Returns the response object.
244 244 """
245 245 dbg = ui.debug
246 246 if (ui.debugflag
247 247 and ui.configbool('devel', 'debug.peer-request')):
248 248 line = 'devel-peer-request: %s\n'
249 249 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
250 250 pycompat.bytesurl(req.get_full_url())))
251 251 hgargssize = None
252 252
253 253 for header, value in sorted(req.header_items()):
254 254 header = pycompat.bytesurl(header)
255 255 value = pycompat.bytesurl(value)
256 256 if header.startswith('X-hgarg-'):
257 257 if hgargssize is None:
258 258 hgargssize = 0
259 259 hgargssize += len(value)
260 260 else:
261 261 dbg(line % ' %s %s' % (header, value))
262 262
263 263 if hgargssize is not None:
264 264 dbg(line % ' %d bytes of commands arguments in headers'
265 265 % hgargssize)
266 266 data = _reqdata(req)
267 267 if data is not None:
268 268 length = getattr(data, 'length', None)
269 269 if length is None:
270 270 length = len(data)
271 271 dbg(line % ' %d bytes of data' % length)
272 272
273 273 start = util.timer()
274 274
275 275 res = None
276 276 try:
277 277 res = opener.open(req)
278 278 except urlerr.httperror as inst:
279 279 if inst.code == 401:
280 280 raise error.Abort(_('authorization failed'))
281 281 raise
282 282 except httplib.HTTPException as inst:
283 283 ui.debug('http error requesting %s\n' %
284 284 util.hidepassword(req.get_full_url()))
285 285 ui.traceback()
286 286 raise IOError(None, inst)
287 287 finally:
288 288 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
289 289 code = res.code if res else -1
290 290 dbg(line % ' finished in %.4f seconds (%d)'
291 291 % (util.timer() - start, code))
292 292
293 293 # Insert error handlers for common I/O failures.
294 294 urlmod.wrapresponse(res)
295 295
296 296 return res
297 297
298 298 class RedirectedRepoError(error.RepoError):
299 299 def __init__(self, msg, respurl):
300 300 super(RedirectedRepoError, self).__init__(msg)
301 301 self.respurl = respurl
302 302
303 303 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
304 304 allowcbor=False):
305 305 # record the url we got redirected to
306 306 redirected = False
307 307 respurl = pycompat.bytesurl(resp.geturl())
308 308 if respurl.endswith(qs):
309 309 respurl = respurl[:-len(qs)]
310 310 qsdropped = False
311 311 else:
312 312 qsdropped = True
313 313
314 314 if baseurl.rstrip('/') != respurl.rstrip('/'):
315 315 redirected = True
316 316 if not ui.quiet:
317 317 ui.warn(_('real URL is %s\n') % respurl)
318 318
319 319 try:
320 320 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
321 321 except AttributeError:
322 322 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
323 323
324 324 safeurl = util.hidepassword(baseurl)
325 325 if proto.startswith('application/hg-error'):
326 326 raise error.OutOfBandError(resp.read())
327 327
328 328 # Pre 1.0 versions of Mercurial used text/plain and
329 329 # application/hg-changegroup. We don't support such old servers.
330 330 if not proto.startswith('application/mercurial-'):
331 331 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
332 332 msg = _("'%s' does not appear to be an hg repository:\n"
333 333 "---%%<--- (%s)\n%s\n---%%<---\n") % (
334 334 safeurl, proto or 'no content-type', resp.read(1024))
335 335
336 336 # Some servers may strip the query string from the redirect. We
337 337 # raise a special error type so callers can react to this specially.
338 338 if redirected and qsdropped:
339 339 raise RedirectedRepoError(msg, respurl)
340 340 else:
341 341 raise error.RepoError(msg)
342 342
343 343 try:
344 344 subtype = proto.split('-', 1)[1]
345 345
346 346 # Unless we end up supporting CBOR in the legacy wire protocol,
347 347 # this should ONLY be encountered for the initial capabilities
348 348 # request during handshake.
349 349 if subtype == 'cbor':
350 350 if allowcbor:
351 351 return respurl, proto, resp
352 352 else:
353 353 raise error.RepoError(_('unexpected CBOR response from '
354 354 'server'))
355 355
356 356 version_info = tuple([int(n) for n in subtype.split('.')])
357 357 except ValueError:
358 358 raise error.RepoError(_("'%s' sent a broken Content-Type "
359 359 "header (%s)") % (safeurl, proto))
360 360
361 361 # TODO consider switching to a decompression reader that uses
362 362 # generators.
363 363 if version_info == (0, 1):
364 364 if compressible:
365 365 resp = util.compengines['zlib'].decompressorreader(resp)
366 366
367 367 elif version_info == (0, 2):
368 368 # application/mercurial-0.2 always identifies the compression
369 369 # engine in the payload header.
370 370 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
371 371 ename = util.readexactly(resp, elen)
372 372 engine = util.compengines.forwiretype(ename)
373 373
374 374 resp = engine.decompressorreader(resp)
375 375 else:
376 376 raise error.RepoError(_("'%s' uses newer protocol %s") %
377 377 (safeurl, subtype))
378 378
379 379 return respurl, proto, resp
380 380
381 381 class httppeer(wireprotov1peer.wirepeer):
382 382 def __init__(self, ui, path, url, opener, requestbuilder, caps):
383 383 self.ui = ui
384 384 self._path = path
385 385 self._url = url
386 386 self._caps = caps
387 387 self.limitedarguments = caps is not None and 'httppostargs' not in caps
388 388 self._urlopener = opener
389 389 self._requestbuilder = requestbuilder
390 390
391 391 def __del__(self):
392 392 for h in self._urlopener.handlers:
393 393 h.close()
394 394 getattr(h, "close_all", lambda: None)()
395 395
396 396 # Begin of ipeerconnection interface.
397 397
398 398 def url(self):
399 399 return self._path
400 400
401 401 def local(self):
402 402 return None
403 403
404 404 def peer(self):
405 405 return self
406 406
407 407 def canpush(self):
408 408 return True
409 409
410 410 def close(self):
411 411 try:
412 412 reqs, sent, recv = (self._urlopener.requestscount,
413 413 self._urlopener.sentbytescount,
414 414 self._urlopener.receivedbytescount)
415 415 except AttributeError:
416 416 return
417 417 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
418 418 'received %d bytes in responses)\n') %
419 419 (reqs, sent, recv))
420 420
421 421 # End of ipeerconnection interface.
422 422
423 423 # Begin of ipeercommands interface.
424 424
425 425 def capabilities(self):
426 426 return self._caps
427 427
428 428 # End of ipeercommands interface.
429 429
430 430 def _callstream(self, cmd, _compressible=False, **args):
431 431 args = pycompat.byteskwargs(args)
432 432
433 433 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
434 434 self._caps, self.capable,
435 435 self._url, cmd, args)
436 436
437 437 resp = sendrequest(self.ui, self._urlopener, req)
438 438
439 439 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
440 440 resp, _compressible)
441 441
442 442 return resp
443 443
444 444 def _call(self, cmd, **args):
445 445 fp = self._callstream(cmd, **args)
446 446 try:
447 447 return fp.read()
448 448 finally:
449 449 # if using keepalive, allow connection to be reused
450 450 fp.close()
451 451
452 452 def _callpush(self, cmd, cg, **args):
453 453 # have to stream bundle to a temp file because we do not have
454 454 # http 1.1 chunked transfer.
455 455
456 456 types = self.capable('unbundle')
457 457 try:
458 458 types = types.split(',')
459 459 except AttributeError:
460 460 # servers older than d1b16a746db6 will send 'unbundle' as a
461 461 # boolean capability. They only support headerless/uncompressed
462 462 # bundles.
463 463 types = [""]
464 464 for x in types:
465 465 if x in bundle2.bundletypes:
466 466 type = x
467 467 break
468 468
469 469 tempname = bundle2.writebundle(self.ui, cg, None, type)
470 470 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
471 471 headers = {r'Content-Type': r'application/mercurial-0.1'}
472 472
473 473 try:
474 474 r = self._call(cmd, data=fp, headers=headers, **args)
475 475 vals = r.split('\n', 1)
476 476 if len(vals) < 2:
477 477 raise error.ResponseError(_("unexpected response:"), r)
478 478 return vals
479 479 except urlerr.httperror:
480 480 # Catch and re-raise these so we don't try and treat them
481 481 # like generic socket errors. They lack any values in
482 482 # .args on Python 3 which breaks our socket.error block.
483 483 raise
484 484 except socket.error as err:
485 485 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
486 486 raise error.Abort(_('push failed: %s') % err.args[1])
487 487 raise error.Abort(err.args[1])
488 488 finally:
489 489 fp.close()
490 490 os.unlink(tempname)
491 491
492 492 def _calltwowaystream(self, cmd, fp, **args):
493 fh = None
494 493 fp_ = None
495 494 filename = None
496 495 try:
497 496 # dump bundle to disk
498 497 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
499 fh = os.fdopen(fd, r"wb")
500 d = fp.read(4096)
501 while d:
502 fh.write(d)
498 with os.fdopen(fd, r"wb") as fh:
503 499 d = fp.read(4096)
504 fh.close()
500 while d:
501 fh.write(d)
502 d = fp.read(4096)
505 503 # start http push
506 504 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
507 505 headers = {r'Content-Type': r'application/mercurial-0.1'}
508 506 return self._callstream(cmd, data=fp_, headers=headers, **args)
509 507 finally:
510 508 if fp_ is not None:
511 509 fp_.close()
512 if fh is not None:
513 fh.close()
510 if filename is not None:
514 511 os.unlink(filename)
515 512
516 513 def _callcompressable(self, cmd, **args):
517 514 return self._callstream(cmd, _compressible=True, **args)
518 515
519 516 def _abort(self, exception):
520 517 raise exception
521 518
522 519 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests,
523 520 redirect):
524 521 wireprotoframing.populatestreamencoders()
525 522
526 523 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
527 524
528 525 if uiencoders:
529 526 encoders = []
530 527
531 528 for encoder in uiencoders:
532 529 if encoder not in wireprotoframing.STREAM_ENCODERS:
533 530 ui.warn(_(b'wire protocol version 2 encoder referenced in '
534 531 b'config (%s) is not known; ignoring\n') % encoder)
535 532 else:
536 533 encoders.append(encoder)
537 534
538 535 else:
539 536 encoders = wireprotoframing.STREAM_ENCODERS_ORDER
540 537
541 538 reactor = wireprotoframing.clientreactor(ui,
542 539 hasmultiplesend=False,
543 540 buffersends=True,
544 541 clientcontentencoders=encoders)
545 542
546 543 handler = wireprotov2peer.clienthandler(ui, reactor,
547 544 opener=opener,
548 545 requestbuilder=requestbuilder)
549 546
550 547 url = '%s/%s' % (apiurl, permission)
551 548
552 549 if len(requests) > 1:
553 550 url += '/multirequest'
554 551 else:
555 552 url += '/%s' % requests[0][0]
556 553
557 554 ui.debug('sending %d commands\n' % len(requests))
558 555 for command, args, f in requests:
559 556 ui.debug('sending command %s: %s\n' % (
560 557 command, stringutil.pprint(args, indent=2)))
561 558 assert not list(handler.callcommand(command, args, f,
562 559 redirect=redirect))
563 560
564 561 # TODO stream this.
565 562 body = b''.join(map(bytes, handler.flushcommands()))
566 563
567 564 # TODO modify user-agent to reflect v2
568 565 headers = {
569 566 r'Accept': wireprotov2server.FRAMINGTYPE,
570 567 r'Content-Type': wireprotov2server.FRAMINGTYPE,
571 568 }
572 569
573 570 req = requestbuilder(pycompat.strurl(url), body, headers)
574 571 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
575 572
576 573 try:
577 574 res = opener.open(req)
578 575 except urlerr.httperror as e:
579 576 if e.code == 401:
580 577 raise error.Abort(_('authorization failed'))
581 578
582 579 raise
583 580 except httplib.HTTPException as e:
584 581 ui.traceback()
585 582 raise IOError(None, e)
586 583
587 584 return handler, res
588 585
589 586 class queuedcommandfuture(pycompat.futures.Future):
590 587 """Wraps result() on command futures to trigger submission on call."""
591 588
592 589 def result(self, timeout=None):
593 590 if self.done():
594 591 return pycompat.futures.Future.result(self, timeout)
595 592
596 593 self._peerexecutor.sendcommands()
597 594
598 595 # sendcommands() will restore the original __class__ and self.result
599 596 # will resolve to Future.result.
600 597 return self.result(timeout)
601 598
602 599 @interfaceutil.implementer(repository.ipeercommandexecutor)
603 600 class httpv2executor(object):
604 601 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor,
605 602 redirect):
606 603 self._ui = ui
607 604 self._opener = opener
608 605 self._requestbuilder = requestbuilder
609 606 self._apiurl = apiurl
610 607 self._descriptor = descriptor
611 608 self._redirect = redirect
612 609 self._sent = False
613 610 self._closed = False
614 611 self._neededpermissions = set()
615 612 self._calls = []
616 613 self._futures = weakref.WeakSet()
617 614 self._responseexecutor = None
618 615 self._responsef = None
619 616
620 617 def __enter__(self):
621 618 return self
622 619
623 620 def __exit__(self, exctype, excvalue, exctb):
624 621 self.close()
625 622
626 623 def callcommand(self, command, args):
627 624 if self._sent:
628 625 raise error.ProgrammingError('callcommand() cannot be used after '
629 626 'commands are sent')
630 627
631 628 if self._closed:
632 629 raise error.ProgrammingError('callcommand() cannot be used after '
633 630 'close()')
634 631
635 632 # The service advertises which commands are available. So if we attempt
636 633 # to call an unknown command or pass an unknown argument, we can screen
637 634 # for this.
638 635 if command not in self._descriptor['commands']:
639 636 raise error.ProgrammingError(
640 637 'wire protocol command %s is not available' % command)
641 638
642 639 cmdinfo = self._descriptor['commands'][command]
643 640 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
644 641
645 642 if unknownargs:
646 643 raise error.ProgrammingError(
647 644 'wire protocol command %s does not accept argument: %s' % (
648 645 command, ', '.join(sorted(unknownargs))))
649 646
650 647 self._neededpermissions |= set(cmdinfo['permissions'])
651 648
652 649 # TODO we /could/ also validate types here, since the API descriptor
653 650 # includes types...
654 651
655 652 f = pycompat.futures.Future()
656 653
657 654 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
658 655 # could deadlock.
659 656 f.__class__ = queuedcommandfuture
660 657 f._peerexecutor = self
661 658
662 659 self._futures.add(f)
663 660 self._calls.append((command, args, f))
664 661
665 662 return f
666 663
667 664 def sendcommands(self):
668 665 if self._sent:
669 666 return
670 667
671 668 if not self._calls:
672 669 return
673 670
674 671 self._sent = True
675 672
676 673 # Unhack any future types so caller sees a clean type and so we
677 674 # break reference cycle.
678 675 for f in self._futures:
679 676 if isinstance(f, queuedcommandfuture):
680 677 f.__class__ = pycompat.futures.Future
681 678 f._peerexecutor = None
682 679
683 680 # Mark the future as running and filter out cancelled futures.
684 681 calls = [(command, args, f)
685 682 for command, args, f in self._calls
686 683 if f.set_running_or_notify_cancel()]
687 684
688 685 # Clear out references, prevent improper object usage.
689 686 self._calls = None
690 687
691 688 if not calls:
692 689 return
693 690
694 691 permissions = set(self._neededpermissions)
695 692
696 693 if 'push' in permissions and 'pull' in permissions:
697 694 permissions.remove('pull')
698 695
699 696 if len(permissions) > 1:
700 697 raise error.RepoError(_('cannot make request requiring multiple '
701 698 'permissions: %s') %
702 699 _(', ').join(sorted(permissions)))
703 700
704 701 permission = {
705 702 'push': 'rw',
706 703 'pull': 'ro',
707 704 }[permissions.pop()]
708 705
709 706 handler, resp = sendv2request(
710 707 self._ui, self._opener, self._requestbuilder, self._apiurl,
711 708 permission, calls, self._redirect)
712 709
713 710 # TODO we probably want to validate the HTTP code, media type, etc.
714 711
715 712 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
716 713 self._responsef = self._responseexecutor.submit(self._handleresponse,
717 714 handler, resp)
718 715
719 716 def close(self):
720 717 if self._closed:
721 718 return
722 719
723 720 self.sendcommands()
724 721
725 722 self._closed = True
726 723
727 724 if not self._responsef:
728 725 return
729 726
730 727 # TODO ^C here may not result in immediate program termination.
731 728
732 729 try:
733 730 self._responsef.result()
734 731 finally:
735 732 self._responseexecutor.shutdown(wait=True)
736 733 self._responsef = None
737 734 self._responseexecutor = None
738 735
739 736 # If any of our futures are still in progress, mark them as
740 737 # errored, otherwise a result() could wait indefinitely.
741 738 for f in self._futures:
742 739 if not f.done():
743 740 f.set_exception(error.ResponseError(
744 741 _('unfulfilled command response')))
745 742
746 743 self._futures = None
747 744
748 745 def _handleresponse(self, handler, resp):
749 746 # Called in a thread to read the response.
750 747
751 748 while handler.readdata(resp):
752 749 pass
753 750
754 751 @interfaceutil.implementer(repository.ipeerv2)
755 752 class httpv2peer(object):
756 753
757 754 limitedarguments = False
758 755
759 756 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
760 757 apidescriptor):
761 758 self.ui = ui
762 759 self.apidescriptor = apidescriptor
763 760
764 761 if repourl.endswith('/'):
765 762 repourl = repourl[:-1]
766 763
767 764 self._url = repourl
768 765 self._apipath = apipath
769 766 self._apiurl = '%s/%s' % (repourl, apipath)
770 767 self._opener = opener
771 768 self._requestbuilder = requestbuilder
772 769
773 770 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
774 771
775 772 # Start of ipeerconnection.
776 773
777 774 def url(self):
778 775 return self._url
779 776
780 777 def local(self):
781 778 return None
782 779
783 780 def peer(self):
784 781 return self
785 782
786 783 def canpush(self):
787 784 # TODO change once implemented.
788 785 return False
789 786
790 787 def close(self):
791 788 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
792 789 'received %d bytes in responses)\n') %
793 790 (self._opener.requestscount,
794 791 self._opener.sentbytescount,
795 792 self._opener.receivedbytescount))
796 793
797 794 # End of ipeerconnection.
798 795
799 796 # Start of ipeercapabilities.
800 797
801 798 def capable(self, name):
802 799 # The capabilities used internally historically map to capabilities
803 800 # advertised from the "capabilities" wire protocol command. However,
804 801 # version 2 of that command works differently.
805 802
806 803 # Maps to commands that are available.
807 804 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
808 805 return True
809 806
810 807 # Other concepts.
811 808 if name in ('bundle2'):
812 809 return True
813 810
814 811 # Alias command-* to presence of command of that name.
815 812 if name.startswith('command-'):
816 813 return name[len('command-'):] in self.apidescriptor['commands']
817 814
818 815 return False
819 816
820 817 def requirecap(self, name, purpose):
821 818 if self.capable(name):
822 819 return
823 820
824 821 raise error.CapabilityError(
825 822 _('cannot %s; client or remote repository does not support the '
826 823 '\'%s\' capability') % (purpose, name))
827 824
828 825 # End of ipeercapabilities.
829 826
830 827 def _call(self, name, **args):
831 828 with self.commandexecutor() as e:
832 829 return e.callcommand(name, args).result()
833 830
834 831 def commandexecutor(self):
835 832 return httpv2executor(self.ui, self._opener, self._requestbuilder,
836 833 self._apiurl, self.apidescriptor, self._redirect)
837 834
838 835 # Registry of API service names to metadata about peers that handle it.
839 836 #
840 837 # The following keys are meaningful:
841 838 #
842 839 # init
843 840 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
844 841 # apidescriptor) to create a peer.
845 842 #
846 843 # priority
847 844 # Integer priority for the service. If we could choose from multiple
848 845 # services, we choose the one with the highest priority.
849 846 API_PEERS = {
850 847 wireprototypes.HTTP_WIREPROTO_V2: {
851 848 'init': httpv2peer,
852 849 'priority': 50,
853 850 },
854 851 }
855 852
856 853 def performhandshake(ui, url, opener, requestbuilder):
857 854 # The handshake is a request to the capabilities command.
858 855
859 856 caps = None
860 857 def capable(x):
861 858 raise error.ProgrammingError('should not be called')
862 859
863 860 args = {}
864 861
865 862 # The client advertises support for newer protocols by adding an
866 863 # X-HgUpgrade-* header with a list of supported APIs and an
867 864 # X-HgProto-* header advertising which serializing formats it supports.
868 865 # We only support the HTTP version 2 transport and CBOR responses for
869 866 # now.
870 867 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
871 868
872 869 if advertisev2:
873 870 args['headers'] = {
874 871 r'X-HgProto-1': r'cbor',
875 872 }
876 873
877 874 args['headers'].update(
878 875 encodevalueinheaders(' '.join(sorted(API_PEERS)),
879 876 'X-HgUpgrade',
880 877 # We don't know the header limit this early.
881 878 # So make it small.
882 879 1024))
883 880
884 881 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
885 882 capable, url, 'capabilities',
886 883 args)
887 884 resp = sendrequest(ui, opener, req)
888 885
889 886 # The server may redirect us to the repo root, stripping the
890 887 # ?cmd=capabilities query string from the URL. The server would likely
891 888 # return HTML in this case and ``parsev1commandresponse()`` would raise.
892 889 # We catch this special case and re-issue the capabilities request against
893 890 # the new URL.
894 891 #
895 892 # We should ideally not do this, as a redirect that drops the query
896 893 # string from the URL is arguably a server bug. (Garbage in, garbage out).
897 894 # However, Mercurial clients for several years appeared to handle this
898 895 # issue without behavior degradation. And according to issue 5860, it may
899 896 # be a longstanding bug in some server implementations. So we allow a
900 897 # redirect that drops the query string to "just work."
901 898 try:
902 899 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
903 900 compressible=False,
904 901 allowcbor=advertisev2)
905 902 except RedirectedRepoError as e:
906 903 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
907 904 capable, e.respurl,
908 905 'capabilities', args)
909 906 resp = sendrequest(ui, opener, req)
910 907 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
911 908 compressible=False,
912 909 allowcbor=advertisev2)
913 910
914 911 try:
915 912 rawdata = resp.read()
916 913 finally:
917 914 resp.close()
918 915
919 916 if not ct.startswith('application/mercurial-'):
920 917 raise error.ProgrammingError('unexpected content-type: %s' % ct)
921 918
922 919 if advertisev2:
923 920 if ct == 'application/mercurial-cbor':
924 921 try:
925 922 info = cborutil.decodeall(rawdata)[0]
926 923 except cborutil.CBORDecodeError:
927 924 raise error.Abort(_('error decoding CBOR from remote server'),
928 925 hint=_('try again and consider contacting '
929 926 'the server operator'))
930 927
931 928 # We got a legacy response. That's fine.
932 929 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
933 930 info = {
934 931 'v1capabilities': set(rawdata.split())
935 932 }
936 933
937 934 else:
938 935 raise error.RepoError(
939 936 _('unexpected response type from server: %s') % ct)
940 937 else:
941 938 info = {
942 939 'v1capabilities': set(rawdata.split())
943 940 }
944 941
945 942 return respurl, info
946 943
947 944 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
948 945 """Construct an appropriate HTTP peer instance.
949 946
950 947 ``opener`` is an ``url.opener`` that should be used to establish
951 948 connections, perform HTTP requests.
952 949
953 950 ``requestbuilder`` is the type used for constructing HTTP requests.
954 951 It exists as an argument so extensions can override the default.
955 952 """
956 953 u = util.url(path)
957 954 if u.query or u.fragment:
958 955 raise error.Abort(_('unsupported URL component: "%s"') %
959 956 (u.query or u.fragment))
960 957
961 958 # urllib cannot handle URLs with embedded user or passwd.
962 959 url, authinfo = u.authinfo()
963 960 ui.debug('using %s\n' % url)
964 961
965 962 opener = opener or urlmod.opener(ui, authinfo)
966 963
967 964 respurl, info = performhandshake(ui, url, opener, requestbuilder)
968 965
969 966 # Given the intersection of APIs that both we and the server support,
970 967 # sort by their advertised priority and pick the first one.
971 968 #
972 969 # TODO consider making this request-based and interface driven. For
973 970 # example, the caller could say "I want a peer that does X." It's quite
974 971 # possible that not all peers would do that. Since we know the service
975 972 # capabilities, we could filter out services not meeting the
976 973 # requirements. Possibly by consulting the interfaces defined by the
977 974 # peer type.
978 975 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
979 976
980 977 preferredchoices = sorted(apipeerchoices,
981 978 key=lambda x: API_PEERS[x]['priority'],
982 979 reverse=True)
983 980
984 981 for service in preferredchoices:
985 982 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
986 983
987 984 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
988 985 requestbuilder,
989 986 info['apis'][service])
990 987
991 988 # Failed to construct an API peer. Fall back to legacy.
992 989 return httppeer(ui, path, respurl, opener, requestbuilder,
993 990 info['v1capabilities'])
994 991
995 992 def instance(ui, path, create, intents=None, createopts=None):
996 993 if create:
997 994 raise error.Abort(_('cannot create new http repository'))
998 995 try:
999 996 if path.startswith('https:') and not urlmod.has_https:
1000 997 raise error.Abort(_('Python support for SSL and HTTPS '
1001 998 'is not installed'))
1002 999
1003 1000 inst = makepeer(ui, path)
1004 1001
1005 1002 return inst
1006 1003 except error.RepoError as httpexception:
1007 1004 try:
1008 1005 r = statichttprepo.instance(ui, "static-" + path, create)
1009 1006 ui.note(_('(falling back to static-http)\n'))
1010 1007 return r
1011 1008 except error.RepoError:
1012 1009 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now