##// END OF EJS Templates
wireproto: extract HTTP version 2 code to own module...
Gregory Szorc -
r37563:93397c46 default
parent child Browse files
Show More
@@ -1,602 +1,602 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17
18 18 from .i18n import _
19 19 from .thirdparty import (
20 20 cbor,
21 21 )
22 22 from . import (
23 23 bundle2,
24 24 error,
25 25 httpconnection,
26 26 pycompat,
27 27 statichttprepo,
28 28 url as urlmod,
29 29 util,
30 30 wireproto,
31 31 wireprotoframing,
32 wireprotoserver,
32 wireprotov2server,
33 33 )
34 34
35 35 httplib = util.httplib
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 def encodevalueinheaders(value, header, limit):
40 40 """Encode a string value into multiple HTTP headers.
41 41
42 42 ``value`` will be encoded into 1 or more HTTP headers with the names
43 43 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
44 44 name + value will be at most ``limit`` bytes long.
45 45
46 46 Returns an iterable of 2-tuples consisting of header names and
47 47 values as native strings.
48 48 """
49 49 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
50 50 # not bytes. This function always takes bytes in as arguments.
51 51 fmt = pycompat.strurl(header) + r'-%s'
52 52 # Note: it is *NOT* a bug that the last bit here is a bytestring
53 53 # and not a unicode: we're just getting the encoded length anyway,
54 54 # and using an r-string to make it portable between Python 2 and 3
55 55 # doesn't work because then the \r is a literal backslash-r
56 56 # instead of a carriage return.
57 57 valuelen = limit - len(fmt % r'000') - len(': \r\n')
58 58 result = []
59 59
60 60 n = 0
61 61 for i in xrange(0, len(value), valuelen):
62 62 n += 1
63 63 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
64 64
65 65 return result
66 66
67 67 def _wraphttpresponse(resp):
68 68 """Wrap an HTTPResponse with common error handlers.
69 69
70 70 This ensures that any I/O from any consumer raises the appropriate
71 71 error and messaging.
72 72 """
73 73 origread = resp.read
74 74
75 75 class readerproxy(resp.__class__):
76 76 def read(self, size=None):
77 77 try:
78 78 return origread(size)
79 79 except httplib.IncompleteRead as e:
80 80 # e.expected is an integer if length known or None otherwise.
81 81 if e.expected:
82 82 msg = _('HTTP request error (incomplete response; '
83 83 'expected %d bytes got %d)') % (e.expected,
84 84 len(e.partial))
85 85 else:
86 86 msg = _('HTTP request error (incomplete response)')
87 87
88 88 raise error.PeerTransportError(
89 89 msg,
90 90 hint=_('this may be an intermittent network failure; '
91 91 'if the error persists, consider contacting the '
92 92 'network or server operator'))
93 93 except httplib.HTTPException as e:
94 94 raise error.PeerTransportError(
95 95 _('HTTP request error (%s)') % e,
96 96 hint=_('this may be an intermittent network failure; '
97 97 'if the error persists, consider contacting the '
98 98 'network or server operator'))
99 99
100 100 resp.__class__ = readerproxy
101 101
102 102 class _multifile(object):
103 103 def __init__(self, *fileobjs):
104 104 for f in fileobjs:
105 105 if not util.safehasattr(f, 'length'):
106 106 raise ValueError(
107 107 '_multifile only supports file objects that '
108 108 'have a length but this one does not:', type(f), f)
109 109 self._fileobjs = fileobjs
110 110 self._index = 0
111 111
112 112 @property
113 113 def length(self):
114 114 return sum(f.length for f in self._fileobjs)
115 115
116 116 def read(self, amt=None):
117 117 if amt <= 0:
118 118 return ''.join(f.read() for f in self._fileobjs)
119 119 parts = []
120 120 while amt and self._index < len(self._fileobjs):
121 121 parts.append(self._fileobjs[self._index].read(amt))
122 122 got = len(parts[-1])
123 123 if got < amt:
124 124 self._index += 1
125 125 amt -= got
126 126 return ''.join(parts)
127 127
128 128 def seek(self, offset, whence=os.SEEK_SET):
129 129 if whence != os.SEEK_SET:
130 130 raise NotImplementedError(
131 131 '_multifile does not support anything other'
132 132 ' than os.SEEK_SET for whence on seek()')
133 133 if offset != 0:
134 134 raise NotImplementedError(
135 135 '_multifile only supports seeking to start, but that '
136 136 'could be fixed if you need it')
137 137 for f in self._fileobjs:
138 138 f.seek(0)
139 139 self._index = 0
140 140
141 141 class httppeer(wireproto.wirepeer):
142 142 def __init__(self, ui, path, url, opener):
143 143 self.ui = ui
144 144 self._path = path
145 145 self._url = url
146 146 self._caps = None
147 147 self._urlopener = opener
148 148 # This is an its own attribute to facilitate extensions overriding
149 149 # the default type.
150 150 self._requestbuilder = urlreq.request
151 151
152 152 def __del__(self):
153 153 for h in self._urlopener.handlers:
154 154 h.close()
155 155 getattr(h, "close_all", lambda: None)()
156 156
157 157 def _openurl(self, req):
158 158 if (self.ui.debugflag
159 159 and self.ui.configbool('devel', 'debug.peer-request')):
160 160 dbg = self.ui.debug
161 161 line = 'devel-peer-request: %s\n'
162 162 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
163 163 hgargssize = None
164 164
165 165 for header, value in sorted(req.header_items()):
166 166 if header.startswith('X-hgarg-'):
167 167 if hgargssize is None:
168 168 hgargssize = 0
169 169 hgargssize += len(value)
170 170 else:
171 171 dbg(line % ' %s %s' % (header, value))
172 172
173 173 if hgargssize is not None:
174 174 dbg(line % ' %d bytes of commands arguments in headers'
175 175 % hgargssize)
176 176
177 177 if req.has_data():
178 178 data = req.get_data()
179 179 length = getattr(data, 'length', None)
180 180 if length is None:
181 181 length = len(data)
182 182 dbg(line % ' %d bytes of data' % length)
183 183
184 184 start = util.timer()
185 185
186 186 ret = self._urlopener.open(req)
187 187 if self.ui.configbool('devel', 'debug.peer-request'):
188 188 dbg(line % ' finished in %.4f seconds (%s)'
189 189 % (util.timer() - start, ret.code))
190 190 return ret
191 191
192 192 # Begin of ipeerconnection interface.
193 193
194 194 def url(self):
195 195 return self._path
196 196
197 197 def local(self):
198 198 return None
199 199
200 200 def peer(self):
201 201 return self
202 202
203 203 def canpush(self):
204 204 return True
205 205
206 206 def close(self):
207 207 pass
208 208
209 209 # End of ipeerconnection interface.
210 210
211 211 # Begin of ipeercommands interface.
212 212
213 213 def capabilities(self):
214 214 # self._fetchcaps() should have been called as part of peer
215 215 # handshake. So self._caps should always be set.
216 216 assert self._caps is not None
217 217 return self._caps
218 218
219 219 # End of ipeercommands interface.
220 220
221 221 # look up capabilities only when needed
222 222
223 223 def _fetchcaps(self):
224 224 self._caps = set(self._call('capabilities').split())
225 225
226 226 def _callstream(self, cmd, _compressible=False, **args):
227 227 args = pycompat.byteskwargs(args)
228 228 if cmd == 'pushkey':
229 229 args['data'] = ''
230 230 data = args.pop('data', None)
231 231 headers = args.pop('headers', {})
232 232
233 233 self.ui.debug("sending %s command\n" % cmd)
234 234 q = [('cmd', cmd)]
235 235 headersize = 0
236 236 varyheaders = []
237 237 # Important: don't use self.capable() here or else you end up
238 238 # with infinite recursion when trying to look up capabilities
239 239 # for the first time.
240 240 postargsok = self._caps is not None and 'httppostargs' in self._caps
241 241
242 242 # Send arguments via POST.
243 243 if postargsok and args:
244 244 strargs = urlreq.urlencode(sorted(args.items()))
245 245 if not data:
246 246 data = strargs
247 247 else:
248 248 if isinstance(data, bytes):
249 249 i = io.BytesIO(data)
250 250 i.length = len(data)
251 251 data = i
252 252 argsio = io.BytesIO(strargs)
253 253 argsio.length = len(strargs)
254 254 data = _multifile(argsio, data)
255 255 headers[r'X-HgArgs-Post'] = len(strargs)
256 256 elif args:
257 257 # Calling self.capable() can infinite loop if we are calling
258 258 # "capabilities". But that command should never accept wire
259 259 # protocol arguments. So this should never happen.
260 260 assert cmd != 'capabilities'
261 261 httpheader = self.capable('httpheader')
262 262 if httpheader:
263 263 headersize = int(httpheader.split(',', 1)[0])
264 264
265 265 # Send arguments via HTTP headers.
266 266 if headersize > 0:
267 267 # The headers can typically carry more data than the URL.
268 268 encargs = urlreq.urlencode(sorted(args.items()))
269 269 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
270 270 headersize):
271 271 headers[header] = value
272 272 varyheaders.append(header)
273 273 # Send arguments via query string (Mercurial <1.9).
274 274 else:
275 275 q += sorted(args.items())
276 276
277 277 qs = '?%s' % urlreq.urlencode(q)
278 278 cu = "%s%s" % (self._url, qs)
279 279 size = 0
280 280 if util.safehasattr(data, 'length'):
281 281 size = data.length
282 282 elif data is not None:
283 283 size = len(data)
284 284 if data is not None and r'Content-Type' not in headers:
285 285 headers[r'Content-Type'] = r'application/mercurial-0.1'
286 286
287 287 # Tell the server we accept application/mercurial-0.2 and multiple
288 288 # compression formats if the server is capable of emitting those
289 289 # payloads.
290 290 protoparams = {'partial-pull'}
291 291
292 292 mediatypes = set()
293 293 if self._caps is not None:
294 294 mt = self.capable('httpmediatype')
295 295 if mt:
296 296 protoparams.add('0.1')
297 297 mediatypes = set(mt.split(','))
298 298
299 299 if '0.2tx' in mediatypes:
300 300 protoparams.add('0.2')
301 301
302 302 if '0.2tx' in mediatypes and self.capable('compression'):
303 303 # We /could/ compare supported compression formats and prune
304 304 # non-mutually supported or error if nothing is mutually supported.
305 305 # For now, send the full list to the server and have it error.
306 306 comps = [e.wireprotosupport().name for e in
307 307 util.compengines.supportedwireengines(util.CLIENTROLE)]
308 308 protoparams.add('comp=%s' % ','.join(comps))
309 309
310 310 if protoparams:
311 311 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
312 312 'X-HgProto',
313 313 headersize or 1024)
314 314 for header, value in protoheaders:
315 315 headers[header] = value
316 316 varyheaders.append(header)
317 317
318 318 if varyheaders:
319 319 headers[r'Vary'] = r','.join(varyheaders)
320 320
321 321 req = self._requestbuilder(pycompat.strurl(cu), data, headers)
322 322
323 323 if data is not None:
324 324 self.ui.debug("sending %d bytes\n" % size)
325 325 req.add_unredirected_header(r'Content-Length', r'%d' % size)
326 326 try:
327 327 resp = self._openurl(req)
328 328 except urlerr.httperror as inst:
329 329 if inst.code == 401:
330 330 raise error.Abort(_('authorization failed'))
331 331 raise
332 332 except httplib.HTTPException as inst:
333 333 self.ui.debug('http error while sending %s command\n' % cmd)
334 334 self.ui.traceback()
335 335 raise IOError(None, inst)
336 336
337 337 # Insert error handlers for common I/O failures.
338 338 _wraphttpresponse(resp)
339 339
340 340 # record the url we got redirected to
341 341 resp_url = pycompat.bytesurl(resp.geturl())
342 342 if resp_url.endswith(qs):
343 343 resp_url = resp_url[:-len(qs)]
344 344 if self._url.rstrip('/') != resp_url.rstrip('/'):
345 345 if not self.ui.quiet:
346 346 self.ui.warn(_('real URL is %s\n') % resp_url)
347 347 self._url = resp_url
348 348 try:
349 349 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
350 350 except AttributeError:
351 351 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
352 352
353 353 safeurl = util.hidepassword(self._url)
354 354 if proto.startswith('application/hg-error'):
355 355 raise error.OutOfBandError(resp.read())
356 356 # accept old "text/plain" and "application/hg-changegroup" for now
357 357 if not (proto.startswith('application/mercurial-') or
358 358 (proto.startswith('text/plain')
359 359 and not resp.headers.get('content-length')) or
360 360 proto.startswith('application/hg-changegroup')):
361 361 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
362 362 raise error.RepoError(
363 363 _("'%s' does not appear to be an hg repository:\n"
364 364 "---%%<--- (%s)\n%s\n---%%<---\n")
365 365 % (safeurl, proto or 'no content-type', resp.read(1024)))
366 366
367 367 if proto.startswith('application/mercurial-'):
368 368 try:
369 369 version = proto.split('-', 1)[1]
370 370 version_info = tuple([int(n) for n in version.split('.')])
371 371 except ValueError:
372 372 raise error.RepoError(_("'%s' sent a broken Content-Type "
373 373 "header (%s)") % (safeurl, proto))
374 374
375 375 # TODO consider switching to a decompression reader that uses
376 376 # generators.
377 377 if version_info == (0, 1):
378 378 if _compressible:
379 379 return util.compengines['zlib'].decompressorreader(resp)
380 380 return resp
381 381 elif version_info == (0, 2):
382 382 # application/mercurial-0.2 always identifies the compression
383 383 # engine in the payload header.
384 384 elen = struct.unpack('B', resp.read(1))[0]
385 385 ename = resp.read(elen)
386 386 engine = util.compengines.forwiretype(ename)
387 387 return engine.decompressorreader(resp)
388 388 else:
389 389 raise error.RepoError(_("'%s' uses newer protocol %s") %
390 390 (safeurl, version))
391 391
392 392 if _compressible:
393 393 return util.compengines['zlib'].decompressorreader(resp)
394 394
395 395 return resp
396 396
397 397 def _call(self, cmd, **args):
398 398 fp = self._callstream(cmd, **args)
399 399 try:
400 400 return fp.read()
401 401 finally:
402 402 # if using keepalive, allow connection to be reused
403 403 fp.close()
404 404
405 405 def _callpush(self, cmd, cg, **args):
406 406 # have to stream bundle to a temp file because we do not have
407 407 # http 1.1 chunked transfer.
408 408
409 409 types = self.capable('unbundle')
410 410 try:
411 411 types = types.split(',')
412 412 except AttributeError:
413 413 # servers older than d1b16a746db6 will send 'unbundle' as a
414 414 # boolean capability. They only support headerless/uncompressed
415 415 # bundles.
416 416 types = [""]
417 417 for x in types:
418 418 if x in bundle2.bundletypes:
419 419 type = x
420 420 break
421 421
422 422 tempname = bundle2.writebundle(self.ui, cg, None, type)
423 423 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
424 424 headers = {r'Content-Type': r'application/mercurial-0.1'}
425 425
426 426 try:
427 427 r = self._call(cmd, data=fp, headers=headers, **args)
428 428 vals = r.split('\n', 1)
429 429 if len(vals) < 2:
430 430 raise error.ResponseError(_("unexpected response:"), r)
431 431 return vals
432 432 except urlerr.httperror:
433 433 # Catch and re-raise these so we don't try and treat them
434 434 # like generic socket errors. They lack any values in
435 435 # .args on Python 3 which breaks our socket.error block.
436 436 raise
437 437 except socket.error as err:
438 438 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
439 439 raise error.Abort(_('push failed: %s') % err.args[1])
440 440 raise error.Abort(err.args[1])
441 441 finally:
442 442 fp.close()
443 443 os.unlink(tempname)
444 444
445 445 def _calltwowaystream(self, cmd, fp, **args):
446 446 fh = None
447 447 fp_ = None
448 448 filename = None
449 449 try:
450 450 # dump bundle to disk
451 451 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
452 452 fh = os.fdopen(fd, r"wb")
453 453 d = fp.read(4096)
454 454 while d:
455 455 fh.write(d)
456 456 d = fp.read(4096)
457 457 fh.close()
458 458 # start http push
459 459 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
460 460 headers = {r'Content-Type': r'application/mercurial-0.1'}
461 461 return self._callstream(cmd, data=fp_, headers=headers, **args)
462 462 finally:
463 463 if fp_ is not None:
464 464 fp_.close()
465 465 if fh is not None:
466 466 fh.close()
467 467 os.unlink(filename)
468 468
469 469 def _callcompressable(self, cmd, **args):
470 470 return self._callstream(cmd, _compressible=True, **args)
471 471
472 472 def _abort(self, exception):
473 473 raise exception
474 474
475 475 # TODO implement interface for version 2 peers
476 476 class httpv2peer(object):
477 477 def __init__(self, ui, repourl, opener):
478 478 self.ui = ui
479 479
480 480 if repourl.endswith('/'):
481 481 repourl = repourl[:-1]
482 482
483 483 self.url = repourl
484 484 self._opener = opener
485 485 # This is an its own attribute to facilitate extensions overriding
486 486 # the default type.
487 487 self._requestbuilder = urlreq.request
488 488
489 489 def close(self):
490 490 pass
491 491
492 492 # TODO require to be part of a batched primitive, use futures.
493 493 def _call(self, name, **args):
494 494 """Call a wire protocol command with arguments."""
495 495
496 496 # TODO permissions should come from capabilities results.
497 497 permission = wireproto.commandsv2[name].permission
498 498 if permission not in ('push', 'pull'):
499 499 raise error.ProgrammingError('unknown permission type: %s' %
500 500 permission)
501 501
502 502 permission = {
503 503 'push': 'rw',
504 504 'pull': 'ro',
505 505 }[permission]
506 506
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
508 name)
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
508 permission, name)
509 509
510 510 # TODO modify user-agent to reflect v2.
511 511 headers = {
512 r'Accept': wireprotoserver.FRAMINGTYPE,
513 r'Content-Type': wireprotoserver.FRAMINGTYPE,
512 r'Accept': wireprotov2server.FRAMINGTYPE,
513 r'Content-Type': wireprotov2server.FRAMINGTYPE,
514 514 }
515 515
516 516 # TODO this should be part of a generic peer for the frame-based
517 517 # protocol.
518 518 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
519 519 buffersends=True)
520 520
521 521 request, action, meta = reactor.callcommand(name, args)
522 522 assert action == 'noop'
523 523
524 524 action, meta = reactor.flushcommands()
525 525 assert action == 'sendframes'
526 526
527 527 body = b''.join(map(bytes, meta['framegen']))
528 528 req = self._requestbuilder(pycompat.strurl(url), body, headers)
529 529 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
530 530
531 531 # TODO unify this code with httppeer.
532 532 try:
533 533 res = self._opener.open(req)
534 534 except urlerr.httperror as e:
535 535 if e.code == 401:
536 536 raise error.Abort(_('authorization failed'))
537 537
538 538 raise
539 539 except httplib.HTTPException as e:
540 540 self.ui.traceback()
541 541 raise IOError(None, e)
542 542
543 543 # TODO validate response type, wrap response to handle I/O errors.
544 544 # TODO more robust frame receiver.
545 545 results = []
546 546
547 547 while True:
548 548 frame = wireprotoframing.readframe(res)
549 549 if frame is None:
550 550 break
551 551
552 552 self.ui.note(_('received %r\n') % frame)
553 553
554 554 action, meta = reactor.onframerecv(frame)
555 555
556 556 if action == 'responsedata':
557 557 if meta['cbor']:
558 558 payload = util.bytesio(meta['data'])
559 559
560 560 decoder = cbor.CBORDecoder(payload)
561 561 while payload.tell() + 1 < len(meta['data']):
562 562 results.append(decoder.decode())
563 563 else:
564 564 results.append(meta['data'])
565 565 else:
566 566 error.ProgrammingError('unhandled action: %s' % action)
567 567
568 568 return results
569 569
570 570 def makepeer(ui, path):
571 571 u = util.url(path)
572 572 if u.query or u.fragment:
573 573 raise error.Abort(_('unsupported URL component: "%s"') %
574 574 (u.query or u.fragment))
575 575
576 576 # urllib cannot handle URLs with embedded user or passwd.
577 577 url, authinfo = u.authinfo()
578 578 ui.debug('using %s\n' % url)
579 579
580 580 opener = urlmod.opener(ui, authinfo)
581 581
582 582 return httppeer(ui, path, url, opener)
583 583
584 584 def instance(ui, path, create):
585 585 if create:
586 586 raise error.Abort(_('cannot create new http repository'))
587 587 try:
588 588 if path.startswith('https:') and not urlmod.has_https:
589 589 raise error.Abort(_('Python support for SSL and HTTPS '
590 590 'is not installed'))
591 591
592 592 inst = makepeer(ui, path)
593 593 inst._fetchcaps()
594 594
595 595 return inst
596 596 except error.RepoError as httpexception:
597 597 try:
598 598 r = statichttprepo.instance(ui, "static-" + path, create)
599 599 ui.note(_('(falling back to static-http)\n'))
600 600 return r
601 601 except error.RepoError:
602 602 raise httpexception # use the original http RepoError instead
@@ -1,1078 +1,737 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 from .thirdparty import (
16 cbor,
17 )
18 15 from .thirdparty.zope import (
19 16 interface as zi,
20 17 )
21 18 from . import (
22 19 encoding,
23 20 error,
24 21 hook,
25 22 pycompat,
26 23 util,
27 24 wireproto,
28 wireprotoframing,
29 25 wireprototypes,
26 wireprotov2server,
30 27 )
31 28 from .utils import (
32 29 procutil,
33 30 )
34 31
35 32 stringio = util.stringio
36 33
37 34 urlerr = util.urlerr
38 35 urlreq = util.urlreq
39 36
40 37 HTTP_OK = 200
41 38
42 39 HGTYPE = 'application/mercurial-0.1'
43 40 HGTYPE2 = 'application/mercurial-0.2'
44 41 HGERRTYPE = 'application/hg-error'
45 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
46 42
47 HTTPV2 = wireprototypes.HTTPV2
48 43 SSHV1 = wireprototypes.SSHV1
49 44 SSHV2 = wireprototypes.SSHV2
50 45
51 46 def decodevaluefromheaders(req, headerprefix):
52 47 """Decode a long value from multiple HTTP request headers.
53 48
54 49 Returns the value as a bytes, not a str.
55 50 """
56 51 chunks = []
57 52 i = 1
58 53 while True:
59 54 v = req.headers.get(b'%s-%d' % (headerprefix, i))
60 55 if v is None:
61 56 break
62 57 chunks.append(pycompat.bytesurl(v))
63 58 i += 1
64 59
65 60 return ''.join(chunks)
66 61
67 62 @zi.implementer(wireprototypes.baseprotocolhandler)
68 63 class httpv1protocolhandler(object):
69 64 def __init__(self, req, ui, checkperm):
70 65 self._req = req
71 66 self._ui = ui
72 67 self._checkperm = checkperm
73 68 self._protocaps = None
74 69
75 70 @property
76 71 def name(self):
77 72 return 'http-v1'
78 73
79 74 def getargs(self, args):
80 75 knownargs = self._args()
81 76 data = {}
82 77 keys = args.split()
83 78 for k in keys:
84 79 if k == '*':
85 80 star = {}
86 81 for key in knownargs.keys():
87 82 if key != 'cmd' and key not in keys:
88 83 star[key] = knownargs[key][0]
89 84 data['*'] = star
90 85 else:
91 86 data[k] = knownargs[k][0]
92 87 return [data[k] for k in keys]
93 88
94 89 def _args(self):
95 90 args = self._req.qsparams.asdictoflists()
96 91 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
97 92 if postlen:
98 93 args.update(urlreq.parseqs(
99 94 self._req.bodyfh.read(postlen), keep_blank_values=True))
100 95 return args
101 96
102 97 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
103 98 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
104 99 return args
105 100
106 101 def getprotocaps(self):
107 102 if self._protocaps is None:
108 103 value = decodevaluefromheaders(self._req, r'X-HgProto')
109 104 self._protocaps = set(value.split(' '))
110 105 return self._protocaps
111 106
112 107 def getpayload(self):
113 108 # Existing clients *always* send Content-Length.
114 109 length = int(self._req.headers[b'Content-Length'])
115 110
116 111 # If httppostargs is used, we need to read Content-Length
117 112 # minus the amount that was consumed by args.
118 113 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
119 114 return util.filechunkiter(self._req.bodyfh, limit=length)
120 115
121 116 @contextlib.contextmanager
122 117 def mayberedirectstdio(self):
123 118 oldout = self._ui.fout
124 119 olderr = self._ui.ferr
125 120
126 121 out = util.stringio()
127 122
128 123 try:
129 124 self._ui.fout = out
130 125 self._ui.ferr = out
131 126 yield out
132 127 finally:
133 128 self._ui.fout = oldout
134 129 self._ui.ferr = olderr
135 130
136 131 def client(self):
137 132 return 'remote:%s:%s:%s' % (
138 133 self._req.urlscheme,
139 134 urlreq.quote(self._req.remotehost or ''),
140 135 urlreq.quote(self._req.remoteuser or ''))
141 136
142 137 def addcapabilities(self, repo, caps):
143 138 caps.append(b'batch')
144 139
145 140 caps.append('httpheader=%d' %
146 141 repo.ui.configint('server', 'maxhttpheaderlen'))
147 142 if repo.ui.configbool('experimental', 'httppostargs'):
148 143 caps.append('httppostargs')
149 144
150 145 # FUTURE advertise 0.2rx once support is implemented
151 146 # FUTURE advertise minrx and mintx after consulting config option
152 147 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
153 148
154 149 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
155 150 if compengines:
156 151 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
157 152 for e in compengines)
158 153 caps.append('compression=%s' % comptypes)
159 154
160 155 return caps
161 156
162 157 def checkperm(self, perm):
163 158 return self._checkperm(perm)
164 159
165 160 # This method exists mostly so that extensions like remotefilelog can
166 161 # disable a kludgey legacy method only over http. As of early 2018,
167 162 # there are no other known users, so with any luck we can discard this
168 163 # hook if remotefilelog becomes a first-party extension.
169 164 def iscmd(cmd):
170 165 return cmd in wireproto.commands
171 166
172 167 def handlewsgirequest(rctx, req, res, checkperm):
173 168 """Possibly process a wire protocol request.
174 169
175 170 If the current request is a wire protocol request, the request is
176 171 processed by this function.
177 172
178 173 ``req`` is a ``parsedrequest`` instance.
179 174 ``res`` is a ``wsgiresponse`` instance.
180 175
181 176 Returns a bool indicating if the request was serviced. If set, the caller
182 177 should stop processing the request, as a response has already been issued.
183 178 """
184 179 # Avoid cycle involving hg module.
185 180 from .hgweb import common as hgwebcommon
186 181
187 182 repo = rctx.repo
188 183
189 184 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
190 185 # string parameter. If it isn't present, this isn't a wire protocol
191 186 # request.
192 187 if 'cmd' not in req.qsparams:
193 188 return False
194 189
195 190 cmd = req.qsparams['cmd']
196 191
197 192 # The "cmd" request parameter is used by both the wire protocol and hgweb.
198 193 # While not all wire protocol commands are available for all transports,
199 194 # if we see a "cmd" value that resembles a known wire protocol command, we
200 195 # route it to a protocol handler. This is better than routing possible
201 196 # wire protocol requests to hgweb because it prevents hgweb from using
202 197 # known wire protocol commands and it is less confusing for machine
203 198 # clients.
204 199 if not iscmd(cmd):
205 200 return False
206 201
207 202 # The "cmd" query string argument is only valid on the root path of the
208 203 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
209 204 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
210 205 # in this case. We send an HTTP 404 for backwards compatibility reasons.
211 206 if req.dispatchpath:
212 207 res.status = hgwebcommon.statusmessage(404)
213 208 res.headers['Content-Type'] = HGTYPE
214 209 # TODO This is not a good response to issue for this request. This
215 210 # is mostly for BC for now.
216 211 res.setbodybytes('0\n%s\n' % b'Not Found')
217 212 return True
218 213
219 214 proto = httpv1protocolhandler(req, repo.ui,
220 215 lambda perm: checkperm(rctx, req, perm))
221 216
222 217 # The permissions checker should be the only thing that can raise an
223 218 # ErrorResponse. It is kind of a layer violation to catch an hgweb
224 219 # exception here. So consider refactoring into a exception type that
225 220 # is associated with the wire protocol.
226 221 try:
227 222 _callhttp(repo, req, res, proto, cmd)
228 223 except hgwebcommon.ErrorResponse as e:
229 224 for k, v in e.headers:
230 225 res.headers[k] = v
231 226 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
232 227 # TODO This response body assumes the failed command was
233 228 # "unbundle." That assumption is not always valid.
234 229 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
235 230
236 231 return True
237 232
238 233 def handlewsgiapirequest(rctx, req, res, checkperm):
239 234 """Handle requests to /api/*."""
240 235 assert req.dispatchparts[0] == b'api'
241 236
242 237 repo = rctx.repo
243 238
244 239 # This whole URL space is experimental for now. But we want to
245 240 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
246 241 if not repo.ui.configbool('experimental', 'web.apiserver'):
247 242 res.status = b'404 Not Found'
248 243 res.headers[b'Content-Type'] = b'text/plain'
249 244 res.setbodybytes(_('Experimental API server endpoint not enabled'))
250 245 return
251 246
252 247 # The URL space is /api/<protocol>/*. The structure of URLs under varies
253 248 # by <protocol>.
254 249
255 250 # Registered APIs are made available via config options of the name of
256 251 # the protocol.
257 252 availableapis = set()
258 253 for k, v in API_HANDLERS.items():
259 254 section, option = v['config']
260 255 if repo.ui.configbool(section, option):
261 256 availableapis.add(k)
262 257
263 258 # Requests to /api/ list available APIs.
264 259 if req.dispatchparts == [b'api']:
265 260 res.status = b'200 OK'
266 261 res.headers[b'Content-Type'] = b'text/plain'
267 262 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
268 263 'one of the following:\n')]
269 264 if availableapis:
270 265 lines.extend(sorted(availableapis))
271 266 else:
272 267 lines.append(_('(no available APIs)\n'))
273 268 res.setbodybytes(b'\n'.join(lines))
274 269 return
275 270
276 271 proto = req.dispatchparts[1]
277 272
278 273 if proto not in API_HANDLERS:
279 274 res.status = b'404 Not Found'
280 275 res.headers[b'Content-Type'] = b'text/plain'
281 276 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
282 277 proto, b', '.join(sorted(availableapis))))
283 278 return
284 279
285 280 if proto not in availableapis:
286 281 res.status = b'404 Not Found'
287 282 res.headers[b'Content-Type'] = b'text/plain'
288 283 res.setbodybytes(_('API %s not enabled\n') % proto)
289 284 return
290 285
291 286 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
292 287 req.dispatchparts[2:])
293 288
294 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
295 from .hgweb import common as hgwebcommon
296
297 # URL space looks like: <permissions>/<command>, where <permission> can
298 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
299
300 # Root URL does nothing meaningful... yet.
301 if not urlparts:
302 res.status = b'200 OK'
303 res.headers[b'Content-Type'] = b'text/plain'
304 res.setbodybytes(_('HTTP version 2 API handler'))
305 return
306
307 if len(urlparts) == 1:
308 res.status = b'404 Not Found'
309 res.headers[b'Content-Type'] = b'text/plain'
310 res.setbodybytes(_('do not know how to process %s\n') %
311 req.dispatchpath)
312 return
313
314 permission, command = urlparts[0:2]
315
316 if permission not in (b'ro', b'rw'):
317 res.status = b'404 Not Found'
318 res.headers[b'Content-Type'] = b'text/plain'
319 res.setbodybytes(_('unknown permission: %s') % permission)
320 return
321
322 if req.method != 'POST':
323 res.status = b'405 Method Not Allowed'
324 res.headers[b'Allow'] = b'POST'
325 res.setbodybytes(_('commands require POST requests'))
326 return
327
328 # At some point we'll want to use our own API instead of recycling the
329 # behavior of version 1 of the wire protocol...
330 # TODO return reasonable responses - not responses that overload the
331 # HTTP status line message for error reporting.
332 try:
333 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
334 except hgwebcommon.ErrorResponse as e:
335 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
336 for k, v in e.headers:
337 res.headers[k] = v
338 res.setbodybytes('permission denied')
339 return
340
341 # We have a special endpoint to reflect the request back at the client.
342 if command == b'debugreflect':
343 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
344 return
345
346 # Extra commands that we handle that aren't really wire protocol
347 # commands. Think extra hard before making this hackery available to
348 # extension.
349 extracommands = {'multirequest'}
350
351 if command not in wireproto.commandsv2 and command not in extracommands:
352 res.status = b'404 Not Found'
353 res.headers[b'Content-Type'] = b'text/plain'
354 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
355 return
356
357 repo = rctx.repo
358 ui = repo.ui
359
360 proto = httpv2protocolhandler(req, ui)
361
362 if (not wireproto.commandsv2.commandavailable(command, proto)
363 and command not in extracommands):
364 res.status = b'404 Not Found'
365 res.headers[b'Content-Type'] = b'text/plain'
366 res.setbodybytes(_('invalid wire protocol command: %s') % command)
367 return
368
369 # TODO consider cases where proxies may add additional Accept headers.
370 if req.headers.get(b'Accept') != FRAMINGTYPE:
371 res.status = b'406 Not Acceptable'
372 res.headers[b'Content-Type'] = b'text/plain'
373 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
374 % FRAMINGTYPE)
375 return
376
377 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
378 res.status = b'415 Unsupported Media Type'
379 # TODO we should send a response with appropriate media type,
380 # since client does Accept it.
381 res.headers[b'Content-Type'] = b'text/plain'
382 res.setbodybytes(_('client MUST send Content-Type header with '
383 'value: %s\n') % FRAMINGTYPE)
384 return
385
386 _processhttpv2request(ui, repo, req, res, permission, command, proto)
387
388 def _processhttpv2reflectrequest(ui, repo, req, res):
389 """Reads unified frame protocol request and dumps out state to client.
390
391 This special endpoint can be used to help debug the wire protocol.
392
393 Instead of routing the request through the normal dispatch mechanism,
394 we instead read all frames, decode them, and feed them into our state
395 tracker. We then dump the log of all that activity back out to the
396 client.
397 """
398 import json
399
400 # Reflection APIs have a history of being abused, accidentally disclosing
401 # sensitive data, etc. So we have a config knob.
402 if not ui.configbool('experimental', 'web.api.debugreflect'):
403 res.status = b'404 Not Found'
404 res.headers[b'Content-Type'] = b'text/plain'
405 res.setbodybytes(_('debugreflect service not available'))
406 return
407
408 # We assume we have a unified framing protocol request body.
409
410 reactor = wireprotoframing.serverreactor()
411 states = []
412
413 while True:
414 frame = wireprotoframing.readframe(req.bodyfh)
415
416 if not frame:
417 states.append(b'received: <no frame>')
418 break
419
420 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
421 frame.requestid,
422 frame.payload))
423
424 action, meta = reactor.onframerecv(frame)
425 states.append(json.dumps((action, meta), sort_keys=True,
426 separators=(', ', ': ')))
427
428 action, meta = reactor.oninputeof()
429 meta['action'] = action
430 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
431
432 res.status = b'200 OK'
433 res.headers[b'Content-Type'] = b'text/plain'
434 res.setbodybytes(b'\n'.join(states))
435
436 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
437 """Post-validation handler for HTTPv2 requests.
438
439 Called when the HTTP request contains unified frame-based protocol
440 frames for evaluation.
441 """
442 # TODO Some HTTP clients are full duplex and can receive data before
443 # the entire request is transmitted. Figure out a way to indicate support
444 # for that so we can opt into full duplex mode.
445 reactor = wireprotoframing.serverreactor(deferoutput=True)
446 seencommand = False
447
448 outstream = reactor.makeoutputstream()
449
450 while True:
451 frame = wireprotoframing.readframe(req.bodyfh)
452 if not frame:
453 break
454
455 action, meta = reactor.onframerecv(frame)
456
457 if action == 'wantframe':
458 # Need more data before we can do anything.
459 continue
460 elif action == 'runcommand':
461 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
462 reqcommand, reactor, outstream,
463 meta, issubsequent=seencommand)
464
465 if sentoutput:
466 return
467
468 seencommand = True
469
470 elif action == 'error':
471 # TODO define proper error mechanism.
472 res.status = b'200 OK'
473 res.headers[b'Content-Type'] = b'text/plain'
474 res.setbodybytes(meta['message'] + b'\n')
475 return
476 else:
477 raise error.ProgrammingError(
478 'unhandled action from frame processor: %s' % action)
479
480 action, meta = reactor.oninputeof()
481 if action == 'sendframes':
482 # We assume we haven't started sending the response yet. If we're
483 # wrong, the response type will raise an exception.
484 res.status = b'200 OK'
485 res.headers[b'Content-Type'] = FRAMINGTYPE
486 res.setbodygen(meta['framegen'])
487 elif action == 'noop':
488 pass
489 else:
490 raise error.ProgrammingError('unhandled action from frame processor: %s'
491 % action)
492
493 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
494 outstream, command, issubsequent):
495 """Dispatch a wire protocol command made from HTTPv2 requests.
496
497 The authenticated permission (``authedperm``) along with the original
498 command from the URL (``reqcommand``) are passed in.
499 """
500 # We already validated that the session has permissions to perform the
501 # actions in ``authedperm``. In the unified frame protocol, the canonical
502 # command to run is expressed in a frame. However, the URL also requested
503 # to run a specific command. We need to be careful that the command we
504 # run doesn't have permissions requirements greater than what was granted
505 # by ``authedperm``.
506 #
507 # Our rule for this is we only allow one command per HTTP request and
508 # that command must match the command in the URL. However, we make
509 # an exception for the ``multirequest`` URL. This URL is allowed to
510 # execute multiple commands. We double check permissions of each command
511 # as it is invoked to ensure there is no privilege escalation.
512 # TODO consider allowing multiple commands to regular command URLs
513 # iff each command is the same.
514
515 proto = httpv2protocolhandler(req, ui, args=command['args'])
516
517 if reqcommand == b'multirequest':
518 if not wireproto.commandsv2.commandavailable(command['command'], proto):
519 # TODO proper error mechanism
520 res.status = b'200 OK'
521 res.headers[b'Content-Type'] = b'text/plain'
522 res.setbodybytes(_('wire protocol command not available: %s') %
523 command['command'])
524 return True
525
526 # TODO don't use assert here, since it may be elided by -O.
527 assert authedperm in (b'ro', b'rw')
528 wirecommand = wireproto.commandsv2[command['command']]
529 assert wirecommand.permission in ('push', 'pull')
530
531 if authedperm == b'ro' and wirecommand.permission != 'pull':
532 # TODO proper error mechanism
533 res.status = b'403 Forbidden'
534 res.headers[b'Content-Type'] = b'text/plain'
535 res.setbodybytes(_('insufficient permissions to execute '
536 'command: %s') % command['command'])
537 return True
538
539 # TODO should we also call checkperm() here? Maybe not if we're going
540 # to overhaul that API. The granted scope from the URL check should
541 # be good enough.
542
543 else:
544 # Don't allow multiple commands outside of ``multirequest`` URL.
545 if issubsequent:
546 # TODO proper error mechanism
547 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = b'text/plain'
549 res.setbodybytes(_('multiple commands cannot be issued to this '
550 'URL'))
551 return True
552
553 if reqcommand != command['command']:
554 # TODO define proper error mechanism
555 res.status = b'200 OK'
556 res.headers[b'Content-Type'] = b'text/plain'
557 res.setbodybytes(_('command in frame must match command in URL'))
558 return True
559
560 rsp = wireproto.dispatch(repo, proto, command['command'])
561
562 res.status = b'200 OK'
563 res.headers[b'Content-Type'] = FRAMINGTYPE
564
565 if isinstance(rsp, wireprototypes.bytesresponse):
566 action, meta = reactor.onbytesresponseready(outstream,
567 command['requestid'],
568 rsp.data)
569 elif isinstance(rsp, wireprototypes.cborresponse):
570 encoded = cbor.dumps(rsp.value, canonical=True)
571 action, meta = reactor.onbytesresponseready(outstream,
572 command['requestid'],
573 encoded,
574 iscbor=True)
575 else:
576 action, meta = reactor.onapplicationerror(
577 _('unhandled response type from wire proto command'))
578
579 if action == 'sendframes':
580 res.setbodygen(meta['framegen'])
581 return True
582 elif action == 'noop':
583 return False
584 else:
585 raise error.ProgrammingError('unhandled event from reactor: %s' %
586 action)
587
588 289 # Maps API name to metadata so custom API can be registered.
589 290 API_HANDLERS = {
590 HTTPV2: {
291 wireprotov2server.HTTPV2: {
591 292 'config': ('experimental', 'web.api.http-v2'),
592 'handler': _handlehttpv2request,
293 'handler': wireprotov2server.handlehttpv2request,
593 294 },
594 295 }
595 296
596 @zi.implementer(wireprototypes.baseprotocolhandler)
597 class httpv2protocolhandler(object):
598 def __init__(self, req, ui, args=None):
599 self._req = req
600 self._ui = ui
601 self._args = args
602
603 @property
604 def name(self):
605 return HTTPV2
606
607 def getargs(self, args):
608 data = {}
609 for k, typ in args.items():
610 if k == '*':
611 raise NotImplementedError('do not support * args')
612 elif k in self._args:
613 # TODO consider validating value types.
614 data[k] = self._args[k]
615
616 return data
617
618 def getprotocaps(self):
619 # Protocol capabilities are currently not implemented for HTTP V2.
620 return set()
621
622 def getpayload(self):
623 raise NotImplementedError
624
625 @contextlib.contextmanager
626 def mayberedirectstdio(self):
627 raise NotImplementedError
628
629 def client(self):
630 raise NotImplementedError
631
632 def addcapabilities(self, repo, caps):
633 return caps
634
635 def checkperm(self, perm):
636 raise NotImplementedError
637
638 297 def _httpresponsetype(ui, proto, prefer_uncompressed):
639 298 """Determine the appropriate response type and compression settings.
640 299
641 300 Returns a tuple of (mediatype, compengine, engineopts).
642 301 """
643 302 # Determine the response media type and compression engine based
644 303 # on the request parameters.
645 304
646 305 if '0.2' in proto.getprotocaps():
647 306 # All clients are expected to support uncompressed data.
648 307 if prefer_uncompressed:
649 308 return HGTYPE2, util._noopengine(), {}
650 309
651 310 # Now find an agreed upon compression format.
652 311 compformats = wireproto.clientcompressionsupport(proto)
653 312 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
654 313 if engine.wireprotosupport().name in compformats:
655 314 opts = {}
656 315 level = ui.configint('server', '%slevel' % engine.name())
657 316 if level is not None:
658 317 opts['level'] = level
659 318
660 319 return HGTYPE2, engine, opts
661 320
662 321 # No mutually supported compression format. Fall back to the
663 322 # legacy protocol.
664 323
665 324 # Don't allow untrusted settings because disabling compression or
666 325 # setting a very high compression level could lead to flooding
667 326 # the server's network or CPU.
668 327 opts = {'level': ui.configint('server', 'zliblevel')}
669 328 return HGTYPE, util.compengines['zlib'], opts
670 329
671 330 def _callhttp(repo, req, res, proto, cmd):
672 331 # Avoid cycle involving hg module.
673 332 from .hgweb import common as hgwebcommon
674 333
675 334 def genversion2(gen, engine, engineopts):
676 335 # application/mercurial-0.2 always sends a payload header
677 336 # identifying the compression engine.
678 337 name = engine.wireprotosupport().name
679 338 assert 0 < len(name) < 256
680 339 yield struct.pack('B', len(name))
681 340 yield name
682 341
683 342 for chunk in gen:
684 343 yield chunk
685 344
686 345 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
687 346 if code == HTTP_OK:
688 347 res.status = '200 Script output follows'
689 348 else:
690 349 res.status = hgwebcommon.statusmessage(code)
691 350
692 351 res.headers['Content-Type'] = contenttype
693 352
694 353 if bodybytes is not None:
695 354 res.setbodybytes(bodybytes)
696 355 if bodygen is not None:
697 356 res.setbodygen(bodygen)
698 357
699 358 if not wireproto.commands.commandavailable(cmd, proto):
700 359 setresponse(HTTP_OK, HGERRTYPE,
701 360 _('requested wire protocol command is not available over '
702 361 'HTTP'))
703 362 return
704 363
705 364 proto.checkperm(wireproto.commands[cmd].permission)
706 365
707 366 rsp = wireproto.dispatch(repo, proto, cmd)
708 367
709 368 if isinstance(rsp, bytes):
710 369 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
711 370 elif isinstance(rsp, wireprototypes.bytesresponse):
712 371 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
713 372 elif isinstance(rsp, wireprototypes.streamreslegacy):
714 373 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
715 374 elif isinstance(rsp, wireprototypes.streamres):
716 375 gen = rsp.gen
717 376
718 377 # This code for compression should not be streamres specific. It
719 378 # is here because we only compress streamres at the moment.
720 379 mediatype, engine, engineopts = _httpresponsetype(
721 380 repo.ui, proto, rsp.prefer_uncompressed)
722 381 gen = engine.compressstream(gen, engineopts)
723 382
724 383 if mediatype == HGTYPE2:
725 384 gen = genversion2(gen, engine, engineopts)
726 385
727 386 setresponse(HTTP_OK, mediatype, bodygen=gen)
728 387 elif isinstance(rsp, wireprototypes.pushres):
729 388 rsp = '%d\n%s' % (rsp.res, rsp.output)
730 389 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
731 390 elif isinstance(rsp, wireprototypes.pusherr):
732 391 rsp = '0\n%s\n' % rsp.res
733 392 res.drain = True
734 393 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
735 394 elif isinstance(rsp, wireprototypes.ooberror):
736 395 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
737 396 else:
738 397 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
739 398
740 399 def _sshv1respondbytes(fout, value):
741 400 """Send a bytes response for protocol version 1."""
742 401 fout.write('%d\n' % len(value))
743 402 fout.write(value)
744 403 fout.flush()
745 404
746 405 def _sshv1respondstream(fout, source):
747 406 write = fout.write
748 407 for chunk in source.gen:
749 408 write(chunk)
750 409 fout.flush()
751 410
752 411 def _sshv1respondooberror(fout, ferr, rsp):
753 412 ferr.write(b'%s\n-\n' % rsp)
754 413 ferr.flush()
755 414 fout.write(b'\n')
756 415 fout.flush()
757 416
758 417 @zi.implementer(wireprototypes.baseprotocolhandler)
759 418 class sshv1protocolhandler(object):
760 419 """Handler for requests services via version 1 of SSH protocol."""
761 420 def __init__(self, ui, fin, fout):
762 421 self._ui = ui
763 422 self._fin = fin
764 423 self._fout = fout
765 424 self._protocaps = set()
766 425
767 426 @property
768 427 def name(self):
769 428 return wireprototypes.SSHV1
770 429
771 430 def getargs(self, args):
772 431 data = {}
773 432 keys = args.split()
774 433 for n in xrange(len(keys)):
775 434 argline = self._fin.readline()[:-1]
776 435 arg, l = argline.split()
777 436 if arg not in keys:
778 437 raise error.Abort(_("unexpected parameter %r") % arg)
779 438 if arg == '*':
780 439 star = {}
781 440 for k in xrange(int(l)):
782 441 argline = self._fin.readline()[:-1]
783 442 arg, l = argline.split()
784 443 val = self._fin.read(int(l))
785 444 star[arg] = val
786 445 data['*'] = star
787 446 else:
788 447 val = self._fin.read(int(l))
789 448 data[arg] = val
790 449 return [data[k] for k in keys]
791 450
792 451 def getprotocaps(self):
793 452 return self._protocaps
794 453
795 454 def getpayload(self):
796 455 # We initially send an empty response. This tells the client it is
797 456 # OK to start sending data. If a client sees any other response, it
798 457 # interprets it as an error.
799 458 _sshv1respondbytes(self._fout, b'')
800 459
801 460 # The file is in the form:
802 461 #
803 462 # <chunk size>\n<chunk>
804 463 # ...
805 464 # 0\n
806 465 count = int(self._fin.readline())
807 466 while count:
808 467 yield self._fin.read(count)
809 468 count = int(self._fin.readline())
810 469
811 470 @contextlib.contextmanager
812 471 def mayberedirectstdio(self):
813 472 yield None
814 473
815 474 def client(self):
816 475 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
817 476 return 'remote:ssh:' + client
818 477
819 478 def addcapabilities(self, repo, caps):
820 479 if self.name == wireprototypes.SSHV1:
821 480 caps.append(b'protocaps')
822 481 caps.append(b'batch')
823 482 return caps
824 483
825 484 def checkperm(self, perm):
826 485 pass
827 486
828 487 class sshv2protocolhandler(sshv1protocolhandler):
829 488 """Protocol handler for version 2 of the SSH protocol."""
830 489
831 490 @property
832 491 def name(self):
833 492 return wireprototypes.SSHV2
834 493
835 494 def addcapabilities(self, repo, caps):
836 495 return caps
837 496
838 497 def _runsshserver(ui, repo, fin, fout, ev):
839 498 # This function operates like a state machine of sorts. The following
840 499 # states are defined:
841 500 #
842 501 # protov1-serving
843 502 # Server is in protocol version 1 serving mode. Commands arrive on
844 503 # new lines. These commands are processed in this state, one command
845 504 # after the other.
846 505 #
847 506 # protov2-serving
848 507 # Server is in protocol version 2 serving mode.
849 508 #
850 509 # upgrade-initial
851 510 # The server is going to process an upgrade request.
852 511 #
853 512 # upgrade-v2-filter-legacy-handshake
854 513 # The protocol is being upgraded to version 2. The server is expecting
855 514 # the legacy handshake from version 1.
856 515 #
857 516 # upgrade-v2-finish
858 517 # The upgrade to version 2 of the protocol is imminent.
859 518 #
860 519 # shutdown
861 520 # The server is shutting down, possibly in reaction to a client event.
862 521 #
863 522 # And here are their transitions:
864 523 #
865 524 # protov1-serving -> shutdown
866 525 # When server receives an empty request or encounters another
867 526 # error.
868 527 #
869 528 # protov1-serving -> upgrade-initial
870 529 # An upgrade request line was seen.
871 530 #
872 531 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
873 532 # Upgrade to version 2 in progress. Server is expecting to
874 533 # process a legacy handshake.
875 534 #
876 535 # upgrade-v2-filter-legacy-handshake -> shutdown
877 536 # Client did not fulfill upgrade handshake requirements.
878 537 #
879 538 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
880 539 # Client fulfilled version 2 upgrade requirements. Finishing that
881 540 # upgrade.
882 541 #
883 542 # upgrade-v2-finish -> protov2-serving
884 543 # Protocol upgrade to version 2 complete. Server can now speak protocol
885 544 # version 2.
886 545 #
887 546 # protov2-serving -> protov1-serving
888 547 # Ths happens by default since protocol version 2 is the same as
889 548 # version 1 except for the handshake.
890 549
891 550 state = 'protov1-serving'
892 551 proto = sshv1protocolhandler(ui, fin, fout)
893 552 protoswitched = False
894 553
895 554 while not ev.is_set():
896 555 if state == 'protov1-serving':
897 556 # Commands are issued on new lines.
898 557 request = fin.readline()[:-1]
899 558
900 559 # Empty lines signal to terminate the connection.
901 560 if not request:
902 561 state = 'shutdown'
903 562 continue
904 563
905 564 # It looks like a protocol upgrade request. Transition state to
906 565 # handle it.
907 566 if request.startswith(b'upgrade '):
908 567 if protoswitched:
909 568 _sshv1respondooberror(fout, ui.ferr,
910 569 b'cannot upgrade protocols multiple '
911 570 b'times')
912 571 state = 'shutdown'
913 572 continue
914 573
915 574 state = 'upgrade-initial'
916 575 continue
917 576
918 577 available = wireproto.commands.commandavailable(request, proto)
919 578
920 579 # This command isn't available. Send an empty response and go
921 580 # back to waiting for a new command.
922 581 if not available:
923 582 _sshv1respondbytes(fout, b'')
924 583 continue
925 584
926 585 rsp = wireproto.dispatch(repo, proto, request)
927 586
928 587 if isinstance(rsp, bytes):
929 588 _sshv1respondbytes(fout, rsp)
930 589 elif isinstance(rsp, wireprototypes.bytesresponse):
931 590 _sshv1respondbytes(fout, rsp.data)
932 591 elif isinstance(rsp, wireprototypes.streamres):
933 592 _sshv1respondstream(fout, rsp)
934 593 elif isinstance(rsp, wireprototypes.streamreslegacy):
935 594 _sshv1respondstream(fout, rsp)
936 595 elif isinstance(rsp, wireprototypes.pushres):
937 596 _sshv1respondbytes(fout, b'')
938 597 _sshv1respondbytes(fout, b'%d' % rsp.res)
939 598 elif isinstance(rsp, wireprototypes.pusherr):
940 599 _sshv1respondbytes(fout, rsp.res)
941 600 elif isinstance(rsp, wireprototypes.ooberror):
942 601 _sshv1respondooberror(fout, ui.ferr, rsp.message)
943 602 else:
944 603 raise error.ProgrammingError('unhandled response type from '
945 604 'wire protocol command: %s' % rsp)
946 605
947 606 # For now, protocol version 2 serving just goes back to version 1.
948 607 elif state == 'protov2-serving':
949 608 state = 'protov1-serving'
950 609 continue
951 610
952 611 elif state == 'upgrade-initial':
953 612 # We should never transition into this state if we've switched
954 613 # protocols.
955 614 assert not protoswitched
956 615 assert proto.name == wireprototypes.SSHV1
957 616
958 617 # Expected: upgrade <token> <capabilities>
959 618 # If we get something else, the request is malformed. It could be
960 619 # from a future client that has altered the upgrade line content.
961 620 # We treat this as an unknown command.
962 621 try:
963 622 token, caps = request.split(b' ')[1:]
964 623 except ValueError:
965 624 _sshv1respondbytes(fout, b'')
966 625 state = 'protov1-serving'
967 626 continue
968 627
969 628 # Send empty response if we don't support upgrading protocols.
970 629 if not ui.configbool('experimental', 'sshserver.support-v2'):
971 630 _sshv1respondbytes(fout, b'')
972 631 state = 'protov1-serving'
973 632 continue
974 633
975 634 try:
976 635 caps = urlreq.parseqs(caps)
977 636 except ValueError:
978 637 _sshv1respondbytes(fout, b'')
979 638 state = 'protov1-serving'
980 639 continue
981 640
982 641 # We don't see an upgrade request to protocol version 2. Ignore
983 642 # the upgrade request.
984 643 wantedprotos = caps.get(b'proto', [b''])[0]
985 644 if SSHV2 not in wantedprotos:
986 645 _sshv1respondbytes(fout, b'')
987 646 state = 'protov1-serving'
988 647 continue
989 648
990 649 # It looks like we can honor this upgrade request to protocol 2.
991 650 # Filter the rest of the handshake protocol request lines.
992 651 state = 'upgrade-v2-filter-legacy-handshake'
993 652 continue
994 653
995 654 elif state == 'upgrade-v2-filter-legacy-handshake':
996 655 # Client should have sent legacy handshake after an ``upgrade``
997 656 # request. Expected lines:
998 657 #
999 658 # hello
1000 659 # between
1001 660 # pairs 81
1002 661 # 0000...-0000...
1003 662
1004 663 ok = True
1005 664 for line in (b'hello', b'between', b'pairs 81'):
1006 665 request = fin.readline()[:-1]
1007 666
1008 667 if request != line:
1009 668 _sshv1respondooberror(fout, ui.ferr,
1010 669 b'malformed handshake protocol: '
1011 670 b'missing %s' % line)
1012 671 ok = False
1013 672 state = 'shutdown'
1014 673 break
1015 674
1016 675 if not ok:
1017 676 continue
1018 677
1019 678 request = fin.read(81)
1020 679 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
1021 680 _sshv1respondooberror(fout, ui.ferr,
1022 681 b'malformed handshake protocol: '
1023 682 b'missing between argument value')
1024 683 state = 'shutdown'
1025 684 continue
1026 685
1027 686 state = 'upgrade-v2-finish'
1028 687 continue
1029 688
1030 689 elif state == 'upgrade-v2-finish':
1031 690 # Send the upgrade response.
1032 691 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1033 692 servercaps = wireproto.capabilities(repo, proto)
1034 693 rsp = b'capabilities: %s' % servercaps.data
1035 694 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1036 695 fout.flush()
1037 696
1038 697 proto = sshv2protocolhandler(ui, fin, fout)
1039 698 protoswitched = True
1040 699
1041 700 state = 'protov2-serving'
1042 701 continue
1043 702
1044 703 elif state == 'shutdown':
1045 704 break
1046 705
1047 706 else:
1048 707 raise error.ProgrammingError('unhandled ssh server state: %s' %
1049 708 state)
1050 709
1051 710 class sshserver(object):
1052 711 def __init__(self, ui, repo, logfh=None):
1053 712 self._ui = ui
1054 713 self._repo = repo
1055 714 self._fin = ui.fin
1056 715 self._fout = ui.fout
1057 716
1058 717 # Log write I/O to stdout and stderr if configured.
1059 718 if logfh:
1060 719 self._fout = util.makeloggingfileobject(
1061 720 logfh, self._fout, 'o', logdata=True)
1062 721 ui.ferr = util.makeloggingfileobject(
1063 722 logfh, ui.ferr, 'e', logdata=True)
1064 723
1065 724 hook.redirect(True)
1066 725 ui.fout = repo.ui.fout = ui.ferr
1067 726
1068 727 # Prevent insertion/deletion of CRs
1069 728 procutil.setbinary(self._fin)
1070 729 procutil.setbinary(self._fout)
1071 730
1072 731 def serve_forever(self):
1073 732 self.serveuntil(threading.Event())
1074 733 sys.exit(0)
1075 734
1076 735 def serveuntil(self, ev):
1077 736 """Serve until a threading.Event is set."""
1078 737 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
This diff has been collapsed as it changes many lines, (716 lines changed) Show them Hide them
@@ -1,1078 +1,364 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 import struct
11 import sys
12 import threading
13 10
14 11 from .i18n import _
15 12 from .thirdparty import (
16 13 cbor,
17 14 )
18 15 from .thirdparty.zope import (
19 16 interface as zi,
20 17 )
21 18 from . import (
22 encoding,
23 19 error,
24 hook,
25 20 pycompat,
26 util,
27 21 wireproto,
28 22 wireprotoframing,
29 23 wireprototypes,
30 24 )
31 from .utils import (
32 procutil,
33 )
34 25
35 stringio = util.stringio
36
37 urlerr = util.urlerr
38 urlreq = util.urlreq
39
40 HTTP_OK = 200
41
42 HGTYPE = 'application/mercurial-0.1'
43 HGTYPE2 = 'application/mercurial-0.2'
44 HGERRTYPE = 'application/hg-error'
45 26 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
46 27
47 28 HTTPV2 = wireprototypes.HTTPV2
48 SSHV1 = wireprototypes.SSHV1
49 SSHV2 = wireprototypes.SSHV2
50 29
51 def decodevaluefromheaders(req, headerprefix):
52 """Decode a long value from multiple HTTP request headers.
53
54 Returns the value as a bytes, not a str.
55 """
56 chunks = []
57 i = 1
58 while True:
59 v = req.headers.get(b'%s-%d' % (headerprefix, i))
60 if v is None:
61 break
62 chunks.append(pycompat.bytesurl(v))
63 i += 1
64
65 return ''.join(chunks)
66
67 @zi.implementer(wireprototypes.baseprotocolhandler)
68 class httpv1protocolhandler(object):
69 def __init__(self, req, ui, checkperm):
70 self._req = req
71 self._ui = ui
72 self._checkperm = checkperm
73 self._protocaps = None
74
75 @property
76 def name(self):
77 return 'http-v1'
78
79 def getargs(self, args):
80 knownargs = self._args()
81 data = {}
82 keys = args.split()
83 for k in keys:
84 if k == '*':
85 star = {}
86 for key in knownargs.keys():
87 if key != 'cmd' and key not in keys:
88 star[key] = knownargs[key][0]
89 data['*'] = star
90 else:
91 data[k] = knownargs[k][0]
92 return [data[k] for k in keys]
93
94 def _args(self):
95 args = self._req.qsparams.asdictoflists()
96 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
97 if postlen:
98 args.update(urlreq.parseqs(
99 self._req.bodyfh.read(postlen), keep_blank_values=True))
100 return args
101
102 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
103 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
104 return args
105
106 def getprotocaps(self):
107 if self._protocaps is None:
108 value = decodevaluefromheaders(self._req, r'X-HgProto')
109 self._protocaps = set(value.split(' '))
110 return self._protocaps
111
112 def getpayload(self):
113 # Existing clients *always* send Content-Length.
114 length = int(self._req.headers[b'Content-Length'])
115
116 # If httppostargs is used, we need to read Content-Length
117 # minus the amount that was consumed by args.
118 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
119 return util.filechunkiter(self._req.bodyfh, limit=length)
120
121 @contextlib.contextmanager
122 def mayberedirectstdio(self):
123 oldout = self._ui.fout
124 olderr = self._ui.ferr
125
126 out = util.stringio()
127
128 try:
129 self._ui.fout = out
130 self._ui.ferr = out
131 yield out
132 finally:
133 self._ui.fout = oldout
134 self._ui.ferr = olderr
135
136 def client(self):
137 return 'remote:%s:%s:%s' % (
138 self._req.urlscheme,
139 urlreq.quote(self._req.remotehost or ''),
140 urlreq.quote(self._req.remoteuser or ''))
141
142 def addcapabilities(self, repo, caps):
143 caps.append(b'batch')
144
145 caps.append('httpheader=%d' %
146 repo.ui.configint('server', 'maxhttpheaderlen'))
147 if repo.ui.configbool('experimental', 'httppostargs'):
148 caps.append('httppostargs')
149
150 # FUTURE advertise 0.2rx once support is implemented
151 # FUTURE advertise minrx and mintx after consulting config option
152 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
153
154 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
155 if compengines:
156 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
157 for e in compengines)
158 caps.append('compression=%s' % comptypes)
159
160 return caps
161
162 def checkperm(self, perm):
163 return self._checkperm(perm)
164
165 # This method exists mostly so that extensions like remotefilelog can
166 # disable a kludgey legacy method only over http. As of early 2018,
167 # there are no other known users, so with any luck we can discard this
168 # hook if remotefilelog becomes a first-party extension.
169 def iscmd(cmd):
170 return cmd in wireproto.commands
171
172 def handlewsgirequest(rctx, req, res, checkperm):
173 """Possibly process a wire protocol request.
174
175 If the current request is a wire protocol request, the request is
176 processed by this function.
177
178 ``req`` is a ``parsedrequest`` instance.
179 ``res`` is a ``wsgiresponse`` instance.
180
181 Returns a bool indicating if the request was serviced. If set, the caller
182 should stop processing the request, as a response has already been issued.
183 """
184 # Avoid cycle involving hg module.
185 from .hgweb import common as hgwebcommon
186
187 repo = rctx.repo
188
189 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
190 # string parameter. If it isn't present, this isn't a wire protocol
191 # request.
192 if 'cmd' not in req.qsparams:
193 return False
194
195 cmd = req.qsparams['cmd']
196
197 # The "cmd" request parameter is used by both the wire protocol and hgweb.
198 # While not all wire protocol commands are available for all transports,
199 # if we see a "cmd" value that resembles a known wire protocol command, we
200 # route it to a protocol handler. This is better than routing possible
201 # wire protocol requests to hgweb because it prevents hgweb from using
202 # known wire protocol commands and it is less confusing for machine
203 # clients.
204 if not iscmd(cmd):
205 return False
206
207 # The "cmd" query string argument is only valid on the root path of the
208 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
209 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
210 # in this case. We send an HTTP 404 for backwards compatibility reasons.
211 if req.dispatchpath:
212 res.status = hgwebcommon.statusmessage(404)
213 res.headers['Content-Type'] = HGTYPE
214 # TODO This is not a good response to issue for this request. This
215 # is mostly for BC for now.
216 res.setbodybytes('0\n%s\n' % b'Not Found')
217 return True
218
219 proto = httpv1protocolhandler(req, repo.ui,
220 lambda perm: checkperm(rctx, req, perm))
221
222 # The permissions checker should be the only thing that can raise an
223 # ErrorResponse. It is kind of a layer violation to catch an hgweb
224 # exception here. So consider refactoring into a exception type that
225 # is associated with the wire protocol.
226 try:
227 _callhttp(repo, req, res, proto, cmd)
228 except hgwebcommon.ErrorResponse as e:
229 for k, v in e.headers:
230 res.headers[k] = v
231 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
232 # TODO This response body assumes the failed command was
233 # "unbundle." That assumption is not always valid.
234 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
235
236 return True
237
238 def handlewsgiapirequest(rctx, req, res, checkperm):
239 """Handle requests to /api/*."""
240 assert req.dispatchparts[0] == b'api'
241
242 repo = rctx.repo
243
244 # This whole URL space is experimental for now. But we want to
245 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
246 if not repo.ui.configbool('experimental', 'web.apiserver'):
247 res.status = b'404 Not Found'
248 res.headers[b'Content-Type'] = b'text/plain'
249 res.setbodybytes(_('Experimental API server endpoint not enabled'))
250 return
251
252 # The URL space is /api/<protocol>/*. The structure of URLs under varies
253 # by <protocol>.
254
255 # Registered APIs are made available via config options of the name of
256 # the protocol.
257 availableapis = set()
258 for k, v in API_HANDLERS.items():
259 section, option = v['config']
260 if repo.ui.configbool(section, option):
261 availableapis.add(k)
262
263 # Requests to /api/ list available APIs.
264 if req.dispatchparts == [b'api']:
265 res.status = b'200 OK'
266 res.headers[b'Content-Type'] = b'text/plain'
267 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
268 'one of the following:\n')]
269 if availableapis:
270 lines.extend(sorted(availableapis))
271 else:
272 lines.append(_('(no available APIs)\n'))
273 res.setbodybytes(b'\n'.join(lines))
274 return
275
276 proto = req.dispatchparts[1]
277
278 if proto not in API_HANDLERS:
279 res.status = b'404 Not Found'
280 res.headers[b'Content-Type'] = b'text/plain'
281 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
282 proto, b', '.join(sorted(availableapis))))
283 return
284
285 if proto not in availableapis:
286 res.status = b'404 Not Found'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('API %s not enabled\n') % proto)
289 return
290
291 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
292 req.dispatchparts[2:])
293
294 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
30 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
295 31 from .hgweb import common as hgwebcommon
296 32
297 33 # URL space looks like: <permissions>/<command>, where <permission> can
298 34 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
299 35
300 36 # Root URL does nothing meaningful... yet.
301 37 if not urlparts:
302 38 res.status = b'200 OK'
303 39 res.headers[b'Content-Type'] = b'text/plain'
304 40 res.setbodybytes(_('HTTP version 2 API handler'))
305 41 return
306 42
307 43 if len(urlparts) == 1:
308 44 res.status = b'404 Not Found'
309 45 res.headers[b'Content-Type'] = b'text/plain'
310 46 res.setbodybytes(_('do not know how to process %s\n') %
311 47 req.dispatchpath)
312 48 return
313 49
314 50 permission, command = urlparts[0:2]
315 51
316 52 if permission not in (b'ro', b'rw'):
317 53 res.status = b'404 Not Found'
318 54 res.headers[b'Content-Type'] = b'text/plain'
319 55 res.setbodybytes(_('unknown permission: %s') % permission)
320 56 return
321 57
322 58 if req.method != 'POST':
323 59 res.status = b'405 Method Not Allowed'
324 60 res.headers[b'Allow'] = b'POST'
325 61 res.setbodybytes(_('commands require POST requests'))
326 62 return
327 63
328 64 # At some point we'll want to use our own API instead of recycling the
329 65 # behavior of version 1 of the wire protocol...
330 66 # TODO return reasonable responses - not responses that overload the
331 67 # HTTP status line message for error reporting.
332 68 try:
333 69 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
334 70 except hgwebcommon.ErrorResponse as e:
335 71 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
336 72 for k, v in e.headers:
337 73 res.headers[k] = v
338 74 res.setbodybytes('permission denied')
339 75 return
340 76
341 77 # We have a special endpoint to reflect the request back at the client.
342 78 if command == b'debugreflect':
343 79 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
344 80 return
345 81
346 82 # Extra commands that we handle that aren't really wire protocol
347 83 # commands. Think extra hard before making this hackery available to
348 84 # extension.
349 85 extracommands = {'multirequest'}
350 86
351 87 if command not in wireproto.commandsv2 and command not in extracommands:
352 88 res.status = b'404 Not Found'
353 89 res.headers[b'Content-Type'] = b'text/plain'
354 90 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
355 91 return
356 92
357 93 repo = rctx.repo
358 94 ui = repo.ui
359 95
360 96 proto = httpv2protocolhandler(req, ui)
361 97
362 98 if (not wireproto.commandsv2.commandavailable(command, proto)
363 99 and command not in extracommands):
364 100 res.status = b'404 Not Found'
365 101 res.headers[b'Content-Type'] = b'text/plain'
366 102 res.setbodybytes(_('invalid wire protocol command: %s') % command)
367 103 return
368 104
369 105 # TODO consider cases where proxies may add additional Accept headers.
370 106 if req.headers.get(b'Accept') != FRAMINGTYPE:
371 107 res.status = b'406 Not Acceptable'
372 108 res.headers[b'Content-Type'] = b'text/plain'
373 109 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
374 110 % FRAMINGTYPE)
375 111 return
376 112
377 113 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
378 114 res.status = b'415 Unsupported Media Type'
379 115 # TODO we should send a response with appropriate media type,
380 116 # since client does Accept it.
381 117 res.headers[b'Content-Type'] = b'text/plain'
382 118 res.setbodybytes(_('client MUST send Content-Type header with '
383 119 'value: %s\n') % FRAMINGTYPE)
384 120 return
385 121
386 122 _processhttpv2request(ui, repo, req, res, permission, command, proto)
387 123
388 124 def _processhttpv2reflectrequest(ui, repo, req, res):
389 125 """Reads unified frame protocol request and dumps out state to client.
390 126
391 127 This special endpoint can be used to help debug the wire protocol.
392 128
393 129 Instead of routing the request through the normal dispatch mechanism,
394 130 we instead read all frames, decode them, and feed them into our state
395 131 tracker. We then dump the log of all that activity back out to the
396 132 client.
397 133 """
398 134 import json
399 135
400 136 # Reflection APIs have a history of being abused, accidentally disclosing
401 137 # sensitive data, etc. So we have a config knob.
402 138 if not ui.configbool('experimental', 'web.api.debugreflect'):
403 139 res.status = b'404 Not Found'
404 140 res.headers[b'Content-Type'] = b'text/plain'
405 141 res.setbodybytes(_('debugreflect service not available'))
406 142 return
407 143
408 144 # We assume we have a unified framing protocol request body.
409 145
410 146 reactor = wireprotoframing.serverreactor()
411 147 states = []
412 148
413 149 while True:
414 150 frame = wireprotoframing.readframe(req.bodyfh)
415 151
416 152 if not frame:
417 153 states.append(b'received: <no frame>')
418 154 break
419 155
420 156 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
421 157 frame.requestid,
422 158 frame.payload))
423 159
424 160 action, meta = reactor.onframerecv(frame)
425 161 states.append(json.dumps((action, meta), sort_keys=True,
426 162 separators=(', ', ': ')))
427 163
428 164 action, meta = reactor.oninputeof()
429 165 meta['action'] = action
430 166 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
431 167
432 168 res.status = b'200 OK'
433 169 res.headers[b'Content-Type'] = b'text/plain'
434 170 res.setbodybytes(b'\n'.join(states))
435 171
436 172 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
437 173 """Post-validation handler for HTTPv2 requests.
438 174
439 175 Called when the HTTP request contains unified frame-based protocol
440 176 frames for evaluation.
441 177 """
442 178 # TODO Some HTTP clients are full duplex and can receive data before
443 179 # the entire request is transmitted. Figure out a way to indicate support
444 180 # for that so we can opt into full duplex mode.
445 181 reactor = wireprotoframing.serverreactor(deferoutput=True)
446 182 seencommand = False
447 183
448 184 outstream = reactor.makeoutputstream()
449 185
450 186 while True:
451 187 frame = wireprotoframing.readframe(req.bodyfh)
452 188 if not frame:
453 189 break
454 190
455 191 action, meta = reactor.onframerecv(frame)
456 192
457 193 if action == 'wantframe':
458 194 # Need more data before we can do anything.
459 195 continue
460 196 elif action == 'runcommand':
461 197 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
462 198 reqcommand, reactor, outstream,
463 199 meta, issubsequent=seencommand)
464 200
465 201 if sentoutput:
466 202 return
467 203
468 204 seencommand = True
469 205
470 206 elif action == 'error':
471 207 # TODO define proper error mechanism.
472 208 res.status = b'200 OK'
473 209 res.headers[b'Content-Type'] = b'text/plain'
474 210 res.setbodybytes(meta['message'] + b'\n')
475 211 return
476 212 else:
477 213 raise error.ProgrammingError(
478 214 'unhandled action from frame processor: %s' % action)
479 215
480 216 action, meta = reactor.oninputeof()
481 217 if action == 'sendframes':
482 218 # We assume we haven't started sending the response yet. If we're
483 219 # wrong, the response type will raise an exception.
484 220 res.status = b'200 OK'
485 221 res.headers[b'Content-Type'] = FRAMINGTYPE
486 222 res.setbodygen(meta['framegen'])
487 223 elif action == 'noop':
488 224 pass
489 225 else:
490 226 raise error.ProgrammingError('unhandled action from frame processor: %s'
491 227 % action)
492 228
493 229 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
494 230 outstream, command, issubsequent):
495 231 """Dispatch a wire protocol command made from HTTPv2 requests.
496 232
497 233 The authenticated permission (``authedperm``) along with the original
498 234 command from the URL (``reqcommand``) are passed in.
499 235 """
500 236 # We already validated that the session has permissions to perform the
501 237 # actions in ``authedperm``. In the unified frame protocol, the canonical
502 238 # command to run is expressed in a frame. However, the URL also requested
503 239 # to run a specific command. We need to be careful that the command we
504 240 # run doesn't have permissions requirements greater than what was granted
505 241 # by ``authedperm``.
506 242 #
507 243 # Our rule for this is we only allow one command per HTTP request and
508 244 # that command must match the command in the URL. However, we make
509 245 # an exception for the ``multirequest`` URL. This URL is allowed to
510 246 # execute multiple commands. We double check permissions of each command
511 247 # as it is invoked to ensure there is no privilege escalation.
512 248 # TODO consider allowing multiple commands to regular command URLs
513 249 # iff each command is the same.
514 250
515 251 proto = httpv2protocolhandler(req, ui, args=command['args'])
516 252
517 253 if reqcommand == b'multirequest':
518 254 if not wireproto.commandsv2.commandavailable(command['command'], proto):
519 255 # TODO proper error mechanism
520 256 res.status = b'200 OK'
521 257 res.headers[b'Content-Type'] = b'text/plain'
522 258 res.setbodybytes(_('wire protocol command not available: %s') %
523 259 command['command'])
524 260 return True
525 261
526 262 # TODO don't use assert here, since it may be elided by -O.
527 263 assert authedperm in (b'ro', b'rw')
528 264 wirecommand = wireproto.commandsv2[command['command']]
529 265 assert wirecommand.permission in ('push', 'pull')
530 266
531 267 if authedperm == b'ro' and wirecommand.permission != 'pull':
532 268 # TODO proper error mechanism
533 269 res.status = b'403 Forbidden'
534 270 res.headers[b'Content-Type'] = b'text/plain'
535 271 res.setbodybytes(_('insufficient permissions to execute '
536 272 'command: %s') % command['command'])
537 273 return True
538 274
539 275 # TODO should we also call checkperm() here? Maybe not if we're going
540 276 # to overhaul that API. The granted scope from the URL check should
541 277 # be good enough.
542 278
543 279 else:
544 280 # Don't allow multiple commands outside of ``multirequest`` URL.
545 281 if issubsequent:
546 282 # TODO proper error mechanism
547 283 res.status = b'200 OK'
548 284 res.headers[b'Content-Type'] = b'text/plain'
549 285 res.setbodybytes(_('multiple commands cannot be issued to this '
550 286 'URL'))
551 287 return True
552 288
553 289 if reqcommand != command['command']:
554 290 # TODO define proper error mechanism
555 291 res.status = b'200 OK'
556 292 res.headers[b'Content-Type'] = b'text/plain'
557 293 res.setbodybytes(_('command in frame must match command in URL'))
558 294 return True
559 295
560 296 rsp = wireproto.dispatch(repo, proto, command['command'])
561 297
562 298 res.status = b'200 OK'
563 299 res.headers[b'Content-Type'] = FRAMINGTYPE
564 300
565 301 if isinstance(rsp, wireprototypes.bytesresponse):
566 302 action, meta = reactor.onbytesresponseready(outstream,
567 303 command['requestid'],
568 304 rsp.data)
569 305 elif isinstance(rsp, wireprototypes.cborresponse):
570 306 encoded = cbor.dumps(rsp.value, canonical=True)
571 307 action, meta = reactor.onbytesresponseready(outstream,
572 308 command['requestid'],
573 309 encoded,
574 310 iscbor=True)
575 311 else:
576 312 action, meta = reactor.onapplicationerror(
577 313 _('unhandled response type from wire proto command'))
578 314
579 315 if action == 'sendframes':
580 316 res.setbodygen(meta['framegen'])
581 317 return True
582 318 elif action == 'noop':
583 319 return False
584 320 else:
585 321 raise error.ProgrammingError('unhandled event from reactor: %s' %
586 322 action)
587 323
588 # Maps API name to metadata so custom API can be registered.
589 API_HANDLERS = {
590 HTTPV2: {
591 'config': ('experimental', 'web.api.http-v2'),
592 'handler': _handlehttpv2request,
593 },
594 }
595
596 324 @zi.implementer(wireprototypes.baseprotocolhandler)
597 325 class httpv2protocolhandler(object):
598 326 def __init__(self, req, ui, args=None):
599 327 self._req = req
600 328 self._ui = ui
601 329 self._args = args
602 330
603 331 @property
604 332 def name(self):
605 333 return HTTPV2
606 334
607 335 def getargs(self, args):
608 336 data = {}
609 337 for k, typ in args.items():
610 338 if k == '*':
611 339 raise NotImplementedError('do not support * args')
612 340 elif k in self._args:
613 341 # TODO consider validating value types.
614 342 data[k] = self._args[k]
615 343
616 344 return data
617 345
618 346 def getprotocaps(self):
619 347 # Protocol capabilities are currently not implemented for HTTP V2.
620 348 return set()
621 349
622 350 def getpayload(self):
623 351 raise NotImplementedError
624 352
625 353 @contextlib.contextmanager
626 354 def mayberedirectstdio(self):
627 355 raise NotImplementedError
628 356
629 357 def client(self):
630 358 raise NotImplementedError
631 359
632 360 def addcapabilities(self, repo, caps):
633 361 return caps
634 362
635 363 def checkperm(self, perm):
636 364 raise NotImplementedError
637
638 def _httpresponsetype(ui, proto, prefer_uncompressed):
639 """Determine the appropriate response type and compression settings.
640
641 Returns a tuple of (mediatype, compengine, engineopts).
642 """
643 # Determine the response media type and compression engine based
644 # on the request parameters.
645
646 if '0.2' in proto.getprotocaps():
647 # All clients are expected to support uncompressed data.
648 if prefer_uncompressed:
649 return HGTYPE2, util._noopengine(), {}
650
651 # Now find an agreed upon compression format.
652 compformats = wireproto.clientcompressionsupport(proto)
653 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
654 if engine.wireprotosupport().name in compformats:
655 opts = {}
656 level = ui.configint('server', '%slevel' % engine.name())
657 if level is not None:
658 opts['level'] = level
659
660 return HGTYPE2, engine, opts
661
662 # No mutually supported compression format. Fall back to the
663 # legacy protocol.
664
665 # Don't allow untrusted settings because disabling compression or
666 # setting a very high compression level could lead to flooding
667 # the server's network or CPU.
668 opts = {'level': ui.configint('server', 'zliblevel')}
669 return HGTYPE, util.compengines['zlib'], opts
670
671 def _callhttp(repo, req, res, proto, cmd):
672 # Avoid cycle involving hg module.
673 from .hgweb import common as hgwebcommon
674
675 def genversion2(gen, engine, engineopts):
676 # application/mercurial-0.2 always sends a payload header
677 # identifying the compression engine.
678 name = engine.wireprotosupport().name
679 assert 0 < len(name) < 256
680 yield struct.pack('B', len(name))
681 yield name
682
683 for chunk in gen:
684 yield chunk
685
686 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
687 if code == HTTP_OK:
688 res.status = '200 Script output follows'
689 else:
690 res.status = hgwebcommon.statusmessage(code)
691
692 res.headers['Content-Type'] = contenttype
693
694 if bodybytes is not None:
695 res.setbodybytes(bodybytes)
696 if bodygen is not None:
697 res.setbodygen(bodygen)
698
699 if not wireproto.commands.commandavailable(cmd, proto):
700 setresponse(HTTP_OK, HGERRTYPE,
701 _('requested wire protocol command is not available over '
702 'HTTP'))
703 return
704
705 proto.checkperm(wireproto.commands[cmd].permission)
706
707 rsp = wireproto.dispatch(repo, proto, cmd)
708
709 if isinstance(rsp, bytes):
710 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
711 elif isinstance(rsp, wireprototypes.bytesresponse):
712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
713 elif isinstance(rsp, wireprototypes.streamreslegacy):
714 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
715 elif isinstance(rsp, wireprototypes.streamres):
716 gen = rsp.gen
717
718 # This code for compression should not be streamres specific. It
719 # is here because we only compress streamres at the moment.
720 mediatype, engine, engineopts = _httpresponsetype(
721 repo.ui, proto, rsp.prefer_uncompressed)
722 gen = engine.compressstream(gen, engineopts)
723
724 if mediatype == HGTYPE2:
725 gen = genversion2(gen, engine, engineopts)
726
727 setresponse(HTTP_OK, mediatype, bodygen=gen)
728 elif isinstance(rsp, wireprototypes.pushres):
729 rsp = '%d\n%s' % (rsp.res, rsp.output)
730 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
731 elif isinstance(rsp, wireprototypes.pusherr):
732 rsp = '0\n%s\n' % rsp.res
733 res.drain = True
734 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
735 elif isinstance(rsp, wireprototypes.ooberror):
736 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
737 else:
738 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
739
740 def _sshv1respondbytes(fout, value):
741 """Send a bytes response for protocol version 1."""
742 fout.write('%d\n' % len(value))
743 fout.write(value)
744 fout.flush()
745
746 def _sshv1respondstream(fout, source):
747 write = fout.write
748 for chunk in source.gen:
749 write(chunk)
750 fout.flush()
751
752 def _sshv1respondooberror(fout, ferr, rsp):
753 ferr.write(b'%s\n-\n' % rsp)
754 ferr.flush()
755 fout.write(b'\n')
756 fout.flush()
757
758 @zi.implementer(wireprototypes.baseprotocolhandler)
759 class sshv1protocolhandler(object):
760 """Handler for requests services via version 1 of SSH protocol."""
761 def __init__(self, ui, fin, fout):
762 self._ui = ui
763 self._fin = fin
764 self._fout = fout
765 self._protocaps = set()
766
767 @property
768 def name(self):
769 return wireprototypes.SSHV1
770
771 def getargs(self, args):
772 data = {}
773 keys = args.split()
774 for n in xrange(len(keys)):
775 argline = self._fin.readline()[:-1]
776 arg, l = argline.split()
777 if arg not in keys:
778 raise error.Abort(_("unexpected parameter %r") % arg)
779 if arg == '*':
780 star = {}
781 for k in xrange(int(l)):
782 argline = self._fin.readline()[:-1]
783 arg, l = argline.split()
784 val = self._fin.read(int(l))
785 star[arg] = val
786 data['*'] = star
787 else:
788 val = self._fin.read(int(l))
789 data[arg] = val
790 return [data[k] for k in keys]
791
792 def getprotocaps(self):
793 return self._protocaps
794
795 def getpayload(self):
796 # We initially send an empty response. This tells the client it is
797 # OK to start sending data. If a client sees any other response, it
798 # interprets it as an error.
799 _sshv1respondbytes(self._fout, b'')
800
801 # The file is in the form:
802 #
803 # <chunk size>\n<chunk>
804 # ...
805 # 0\n
806 count = int(self._fin.readline())
807 while count:
808 yield self._fin.read(count)
809 count = int(self._fin.readline())
810
811 @contextlib.contextmanager
812 def mayberedirectstdio(self):
813 yield None
814
815 def client(self):
816 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
817 return 'remote:ssh:' + client
818
819 def addcapabilities(self, repo, caps):
820 if self.name == wireprototypes.SSHV1:
821 caps.append(b'protocaps')
822 caps.append(b'batch')
823 return caps
824
825 def checkperm(self, perm):
826 pass
827
828 class sshv2protocolhandler(sshv1protocolhandler):
829 """Protocol handler for version 2 of the SSH protocol."""
830
831 @property
832 def name(self):
833 return wireprototypes.SSHV2
834
835 def addcapabilities(self, repo, caps):
836 return caps
837
838 def _runsshserver(ui, repo, fin, fout, ev):
839 # This function operates like a state machine of sorts. The following
840 # states are defined:
841 #
842 # protov1-serving
843 # Server is in protocol version 1 serving mode. Commands arrive on
844 # new lines. These commands are processed in this state, one command
845 # after the other.
846 #
847 # protov2-serving
848 # Server is in protocol version 2 serving mode.
849 #
850 # upgrade-initial
851 # The server is going to process an upgrade request.
852 #
853 # upgrade-v2-filter-legacy-handshake
854 # The protocol is being upgraded to version 2. The server is expecting
855 # the legacy handshake from version 1.
856 #
857 # upgrade-v2-finish
858 # The upgrade to version 2 of the protocol is imminent.
859 #
860 # shutdown
861 # The server is shutting down, possibly in reaction to a client event.
862 #
863 # And here are their transitions:
864 #
865 # protov1-serving -> shutdown
866 # When server receives an empty request or encounters another
867 # error.
868 #
869 # protov1-serving -> upgrade-initial
870 # An upgrade request line was seen.
871 #
872 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
873 # Upgrade to version 2 in progress. Server is expecting to
874 # process a legacy handshake.
875 #
876 # upgrade-v2-filter-legacy-handshake -> shutdown
877 # Client did not fulfill upgrade handshake requirements.
878 #
879 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
880 # Client fulfilled version 2 upgrade requirements. Finishing that
881 # upgrade.
882 #
883 # upgrade-v2-finish -> protov2-serving
884 # Protocol upgrade to version 2 complete. Server can now speak protocol
885 # version 2.
886 #
887 # protov2-serving -> protov1-serving
888 # Ths happens by default since protocol version 2 is the same as
889 # version 1 except for the handshake.
890
891 state = 'protov1-serving'
892 proto = sshv1protocolhandler(ui, fin, fout)
893 protoswitched = False
894
895 while not ev.is_set():
896 if state == 'protov1-serving':
897 # Commands are issued on new lines.
898 request = fin.readline()[:-1]
899
900 # Empty lines signal to terminate the connection.
901 if not request:
902 state = 'shutdown'
903 continue
904
905 # It looks like a protocol upgrade request. Transition state to
906 # handle it.
907 if request.startswith(b'upgrade '):
908 if protoswitched:
909 _sshv1respondooberror(fout, ui.ferr,
910 b'cannot upgrade protocols multiple '
911 b'times')
912 state = 'shutdown'
913 continue
914
915 state = 'upgrade-initial'
916 continue
917
918 available = wireproto.commands.commandavailable(request, proto)
919
920 # This command isn't available. Send an empty response and go
921 # back to waiting for a new command.
922 if not available:
923 _sshv1respondbytes(fout, b'')
924 continue
925
926 rsp = wireproto.dispatch(repo, proto, request)
927
928 if isinstance(rsp, bytes):
929 _sshv1respondbytes(fout, rsp)
930 elif isinstance(rsp, wireprototypes.bytesresponse):
931 _sshv1respondbytes(fout, rsp.data)
932 elif isinstance(rsp, wireprototypes.streamres):
933 _sshv1respondstream(fout, rsp)
934 elif isinstance(rsp, wireprototypes.streamreslegacy):
935 _sshv1respondstream(fout, rsp)
936 elif isinstance(rsp, wireprototypes.pushres):
937 _sshv1respondbytes(fout, b'')
938 _sshv1respondbytes(fout, b'%d' % rsp.res)
939 elif isinstance(rsp, wireprototypes.pusherr):
940 _sshv1respondbytes(fout, rsp.res)
941 elif isinstance(rsp, wireprototypes.ooberror):
942 _sshv1respondooberror(fout, ui.ferr, rsp.message)
943 else:
944 raise error.ProgrammingError('unhandled response type from '
945 'wire protocol command: %s' % rsp)
946
947 # For now, protocol version 2 serving just goes back to version 1.
948 elif state == 'protov2-serving':
949 state = 'protov1-serving'
950 continue
951
952 elif state == 'upgrade-initial':
953 # We should never transition into this state if we've switched
954 # protocols.
955 assert not protoswitched
956 assert proto.name == wireprototypes.SSHV1
957
958 # Expected: upgrade <token> <capabilities>
959 # If we get something else, the request is malformed. It could be
960 # from a future client that has altered the upgrade line content.
961 # We treat this as an unknown command.
962 try:
963 token, caps = request.split(b' ')[1:]
964 except ValueError:
965 _sshv1respondbytes(fout, b'')
966 state = 'protov1-serving'
967 continue
968
969 # Send empty response if we don't support upgrading protocols.
970 if not ui.configbool('experimental', 'sshserver.support-v2'):
971 _sshv1respondbytes(fout, b'')
972 state = 'protov1-serving'
973 continue
974
975 try:
976 caps = urlreq.parseqs(caps)
977 except ValueError:
978 _sshv1respondbytes(fout, b'')
979 state = 'protov1-serving'
980 continue
981
982 # We don't see an upgrade request to protocol version 2. Ignore
983 # the upgrade request.
984 wantedprotos = caps.get(b'proto', [b''])[0]
985 if SSHV2 not in wantedprotos:
986 _sshv1respondbytes(fout, b'')
987 state = 'protov1-serving'
988 continue
989
990 # It looks like we can honor this upgrade request to protocol 2.
991 # Filter the rest of the handshake protocol request lines.
992 state = 'upgrade-v2-filter-legacy-handshake'
993 continue
994
995 elif state == 'upgrade-v2-filter-legacy-handshake':
996 # Client should have sent legacy handshake after an ``upgrade``
997 # request. Expected lines:
998 #
999 # hello
1000 # between
1001 # pairs 81
1002 # 0000...-0000...
1003
1004 ok = True
1005 for line in (b'hello', b'between', b'pairs 81'):
1006 request = fin.readline()[:-1]
1007
1008 if request != line:
1009 _sshv1respondooberror(fout, ui.ferr,
1010 b'malformed handshake protocol: '
1011 b'missing %s' % line)
1012 ok = False
1013 state = 'shutdown'
1014 break
1015
1016 if not ok:
1017 continue
1018
1019 request = fin.read(81)
1020 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
1021 _sshv1respondooberror(fout, ui.ferr,
1022 b'malformed handshake protocol: '
1023 b'missing between argument value')
1024 state = 'shutdown'
1025 continue
1026
1027 state = 'upgrade-v2-finish'
1028 continue
1029
1030 elif state == 'upgrade-v2-finish':
1031 # Send the upgrade response.
1032 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1033 servercaps = wireproto.capabilities(repo, proto)
1034 rsp = b'capabilities: %s' % servercaps.data
1035 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1036 fout.flush()
1037
1038 proto = sshv2protocolhandler(ui, fin, fout)
1039 protoswitched = True
1040
1041 state = 'protov2-serving'
1042 continue
1043
1044 elif state == 'shutdown':
1045 break
1046
1047 else:
1048 raise error.ProgrammingError('unhandled ssh server state: %s' %
1049 state)
1050
1051 class sshserver(object):
1052 def __init__(self, ui, repo, logfh=None):
1053 self._ui = ui
1054 self._repo = repo
1055 self._fin = ui.fin
1056 self._fout = ui.fout
1057
1058 # Log write I/O to stdout and stderr if configured.
1059 if logfh:
1060 self._fout = util.makeloggingfileobject(
1061 logfh, self._fout, 'o', logdata=True)
1062 ui.ferr = util.makeloggingfileobject(
1063 logfh, ui.ferr, 'e', logdata=True)
1064
1065 hook.redirect(True)
1066 ui.fout = repo.ui.fout = ui.ferr
1067
1068 # Prevent insertion/deletion of CRs
1069 procutil.setbinary(self._fin)
1070 procutil.setbinary(self._fout)
1071
1072 def serve_forever(self):
1073 self.serveuntil(threading.Event())
1074 sys.exit(0)
1075
1076 def serveuntil(self, ev):
1077 """Serve until a threading.Event is set."""
1078 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,146 +1,147 b''
1 1 # Test that certain objects conform to well-defined interfaces.
2 2
3 3 from __future__ import absolute_import, print_function
4 4
5 5 import os
6 6
7 7 from mercurial.thirdparty.zope import (
8 8 interface as zi,
9 9 )
10 10 from mercurial.thirdparty.zope.interface import (
11 11 verify as ziverify,
12 12 )
13 13 from mercurial import (
14 14 bundlerepo,
15 15 filelog,
16 16 httppeer,
17 17 localrepo,
18 18 repository,
19 19 sshpeer,
20 20 statichttprepo,
21 21 ui as uimod,
22 22 unionrepo,
23 23 vfs as vfsmod,
24 24 wireprotoserver,
25 25 wireprototypes,
26 wireprotov2server,
26 27 )
27 28
28 29 rootdir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
29 30
30 31 def checkzobject(o, allowextra=False):
31 32 """Verify an object with a zope interface."""
32 33 ifaces = zi.providedBy(o)
33 34 if not ifaces:
34 35 print('%r does not provide any zope interfaces' % o)
35 36 return
36 37
37 38 # Run zope.interface's built-in verification routine. This verifies that
38 39 # everything that is supposed to be present is present.
39 40 for iface in ifaces:
40 41 ziverify.verifyObject(iface, o)
41 42
42 43 if allowextra:
43 44 return
44 45
45 46 # Now verify that the object provides no extra public attributes that
46 47 # aren't declared as part of interfaces.
47 48 allowed = set()
48 49 for iface in ifaces:
49 50 allowed |= set(iface.names(all=True))
50 51
51 52 public = {a for a in dir(o) if not a.startswith('_')}
52 53
53 54 for attr in sorted(public - allowed):
54 55 print('public attribute not declared in interfaces: %s.%s' % (
55 56 o.__class__.__name__, attr))
56 57
57 58 # Facilitates testing localpeer.
58 59 class dummyrepo(object):
59 60 def __init__(self):
60 61 self.ui = uimod.ui()
61 62 def filtered(self, name):
62 63 pass
63 64 def _restrictcapabilities(self, caps):
64 65 pass
65 66
66 67 class dummyopener(object):
67 68 handlers = []
68 69
69 70 # Facilitates testing sshpeer without requiring an SSH server.
70 71 class badpeer(httppeer.httppeer):
71 72 def __init__(self):
72 73 super(badpeer, self).__init__(None, None, None, dummyopener())
73 74 self.badattribute = True
74 75
75 76 def badmethod(self):
76 77 pass
77 78
78 79 class dummypipe(object):
79 80 def close(self):
80 81 pass
81 82
82 83 def main():
83 84 ui = uimod.ui()
84 85 # Needed so we can open a local repo with obsstore without a warning.
85 86 ui.setconfig('experimental', 'evolution.createmarkers', True)
86 87
87 88 checkzobject(badpeer())
88 89
89 90 ziverify.verifyClass(repository.ipeerbaselegacycommands,
90 91 httppeer.httppeer)
91 92 checkzobject(httppeer.httppeer(None, None, None, dummyopener()))
92 93
93 94 ziverify.verifyClass(repository.ipeerbase,
94 95 localrepo.localpeer)
95 96 checkzobject(localrepo.localpeer(dummyrepo()))
96 97
97 98 ziverify.verifyClass(repository.ipeerbaselegacycommands,
98 99 sshpeer.sshv1peer)
99 100 checkzobject(sshpeer.sshv1peer(ui, 'ssh://localhost/foo', None, dummypipe(),
100 101 dummypipe(), None, None))
101 102
102 103 ziverify.verifyClass(repository.ipeerbaselegacycommands,
103 104 sshpeer.sshv2peer)
104 105 checkzobject(sshpeer.sshv2peer(ui, 'ssh://localhost/foo', None, dummypipe(),
105 106 dummypipe(), None, None))
106 107
107 108 ziverify.verifyClass(repository.ipeerbase, bundlerepo.bundlepeer)
108 109 checkzobject(bundlerepo.bundlepeer(dummyrepo()))
109 110
110 111 ziverify.verifyClass(repository.ipeerbase, statichttprepo.statichttppeer)
111 112 checkzobject(statichttprepo.statichttppeer(dummyrepo()))
112 113
113 114 ziverify.verifyClass(repository.ipeerbase, unionrepo.unionpeer)
114 115 checkzobject(unionrepo.unionpeer(dummyrepo()))
115 116
116 117 ziverify.verifyClass(repository.completelocalrepository,
117 118 localrepo.localrepository)
118 119 repo = localrepo.localrepository(ui, rootdir)
119 120 checkzobject(repo)
120 121
121 122 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
122 123 wireprotoserver.sshv1protocolhandler)
123 124 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
124 125 wireprotoserver.sshv2protocolhandler)
125 126 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
126 127 wireprotoserver.httpv1protocolhandler)
127 128 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
128 wireprotoserver.httpv2protocolhandler)
129 wireprotov2server.httpv2protocolhandler)
129 130
130 131 sshv1 = wireprotoserver.sshv1protocolhandler(None, None, None)
131 132 checkzobject(sshv1)
132 133 sshv2 = wireprotoserver.sshv2protocolhandler(None, None, None)
133 134 checkzobject(sshv2)
134 135
135 136 httpv1 = wireprotoserver.httpv1protocolhandler(None, None, None)
136 137 checkzobject(httpv1)
137 httpv2 = wireprotoserver.httpv2protocolhandler(None, None)
138 httpv2 = wireprotov2server.httpv2protocolhandler(None, None)
138 139 checkzobject(httpv2)
139 140
140 141 ziverify.verifyClass(repository.ifilestorage, filelog.filelog)
141 142
142 143 vfs = vfsmod.vfs('.')
143 144 fl = filelog.filelog(vfs, 'dummy.i')
144 145 checkzobject(fl, allowextra=True)
145 146
146 147 main()
General Comments 0
You need to be logged in to leave comments. Login now