##// END OF EJS Templates
wireproto: extract HTTP version 2 code to own module...
Gregory Szorc -
r37563:93397c46 default
parent child Browse files
Show More
@@ -29,7 +29,7 b' from . import ('
29 29 util,
30 30 wireproto,
31 31 wireprotoframing,
32 wireprotoserver,
32 wireprotov2server,
33 33 )
34 34
35 35 httplib = util.httplib
@@ -504,13 +504,13 b' class httpv2peer(object):'
504 504 'pull': 'ro',
505 505 }[permission]
506 506
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
508 name)
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotov2server.HTTPV2,
508 permission, name)
509 509
510 510 # TODO modify user-agent to reflect v2.
511 511 headers = {
512 r'Accept': wireprotoserver.FRAMINGTYPE,
513 r'Content-Type': wireprotoserver.FRAMINGTYPE,
512 r'Accept': wireprotov2server.FRAMINGTYPE,
513 r'Content-Type': wireprotov2server.FRAMINGTYPE,
514 514 }
515 515
516 516 # TODO this should be part of a generic peer for the frame-based
@@ -12,9 +12,6 b' import sys'
12 12 import threading
13 13
14 14 from .i18n import _
15 from .thirdparty import (
16 cbor,
17 )
18 15 from .thirdparty.zope import (
19 16 interface as zi,
20 17 )
@@ -25,8 +22,8 b' from . import ('
25 22 pycompat,
26 23 util,
27 24 wireproto,
28 wireprotoframing,
29 25 wireprototypes,
26 wireprotov2server,
30 27 )
31 28 from .utils import (
32 29 procutil,
@@ -42,9 +39,7 b' HTTP_OK = 200'
42 39 HGTYPE = 'application/mercurial-0.1'
43 40 HGTYPE2 = 'application/mercurial-0.2'
44 41 HGERRTYPE = 'application/hg-error'
45 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
46 42
47 HTTPV2 = wireprototypes.HTTPV2
48 43 SSHV1 = wireprototypes.SSHV1
49 44 SSHV2 = wireprototypes.SSHV2
50 45
@@ -291,350 +286,14 b' def handlewsgiapirequest(rctx, req, res,'
291 286 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
292 287 req.dispatchparts[2:])
293 288
294 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
295 from .hgweb import common as hgwebcommon
296
297 # URL space looks like: <permissions>/<command>, where <permission> can
298 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
299
300 # Root URL does nothing meaningful... yet.
301 if not urlparts:
302 res.status = b'200 OK'
303 res.headers[b'Content-Type'] = b'text/plain'
304 res.setbodybytes(_('HTTP version 2 API handler'))
305 return
306
307 if len(urlparts) == 1:
308 res.status = b'404 Not Found'
309 res.headers[b'Content-Type'] = b'text/plain'
310 res.setbodybytes(_('do not know how to process %s\n') %
311 req.dispatchpath)
312 return
313
314 permission, command = urlparts[0:2]
315
316 if permission not in (b'ro', b'rw'):
317 res.status = b'404 Not Found'
318 res.headers[b'Content-Type'] = b'text/plain'
319 res.setbodybytes(_('unknown permission: %s') % permission)
320 return
321
322 if req.method != 'POST':
323 res.status = b'405 Method Not Allowed'
324 res.headers[b'Allow'] = b'POST'
325 res.setbodybytes(_('commands require POST requests'))
326 return
327
328 # At some point we'll want to use our own API instead of recycling the
329 # behavior of version 1 of the wire protocol...
330 # TODO return reasonable responses - not responses that overload the
331 # HTTP status line message for error reporting.
332 try:
333 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
334 except hgwebcommon.ErrorResponse as e:
335 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
336 for k, v in e.headers:
337 res.headers[k] = v
338 res.setbodybytes('permission denied')
339 return
340
341 # We have a special endpoint to reflect the request back at the client.
342 if command == b'debugreflect':
343 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
344 return
345
346 # Extra commands that we handle that aren't really wire protocol
347 # commands. Think extra hard before making this hackery available to
348 # extension.
349 extracommands = {'multirequest'}
350
351 if command not in wireproto.commandsv2 and command not in extracommands:
352 res.status = b'404 Not Found'
353 res.headers[b'Content-Type'] = b'text/plain'
354 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
355 return
356
357 repo = rctx.repo
358 ui = repo.ui
359
360 proto = httpv2protocolhandler(req, ui)
361
362 if (not wireproto.commandsv2.commandavailable(command, proto)
363 and command not in extracommands):
364 res.status = b'404 Not Found'
365 res.headers[b'Content-Type'] = b'text/plain'
366 res.setbodybytes(_('invalid wire protocol command: %s') % command)
367 return
368
369 # TODO consider cases where proxies may add additional Accept headers.
370 if req.headers.get(b'Accept') != FRAMINGTYPE:
371 res.status = b'406 Not Acceptable'
372 res.headers[b'Content-Type'] = b'text/plain'
373 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
374 % FRAMINGTYPE)
375 return
376
377 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
378 res.status = b'415 Unsupported Media Type'
379 # TODO we should send a response with appropriate media type,
380 # since client does Accept it.
381 res.headers[b'Content-Type'] = b'text/plain'
382 res.setbodybytes(_('client MUST send Content-Type header with '
383 'value: %s\n') % FRAMINGTYPE)
384 return
385
386 _processhttpv2request(ui, repo, req, res, permission, command, proto)
387
388 def _processhttpv2reflectrequest(ui, repo, req, res):
389 """Reads unified frame protocol request and dumps out state to client.
390
391 This special endpoint can be used to help debug the wire protocol.
392
393 Instead of routing the request through the normal dispatch mechanism,
394 we instead read all frames, decode them, and feed them into our state
395 tracker. We then dump the log of all that activity back out to the
396 client.
397 """
398 import json
399
400 # Reflection APIs have a history of being abused, accidentally disclosing
401 # sensitive data, etc. So we have a config knob.
402 if not ui.configbool('experimental', 'web.api.debugreflect'):
403 res.status = b'404 Not Found'
404 res.headers[b'Content-Type'] = b'text/plain'
405 res.setbodybytes(_('debugreflect service not available'))
406 return
407
408 # We assume we have a unified framing protocol request body.
409
410 reactor = wireprotoframing.serverreactor()
411 states = []
412
413 while True:
414 frame = wireprotoframing.readframe(req.bodyfh)
415
416 if not frame:
417 states.append(b'received: <no frame>')
418 break
419
420 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
421 frame.requestid,
422 frame.payload))
423
424 action, meta = reactor.onframerecv(frame)
425 states.append(json.dumps((action, meta), sort_keys=True,
426 separators=(', ', ': ')))
427
428 action, meta = reactor.oninputeof()
429 meta['action'] = action
430 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
431
432 res.status = b'200 OK'
433 res.headers[b'Content-Type'] = b'text/plain'
434 res.setbodybytes(b'\n'.join(states))
435
436 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
437 """Post-validation handler for HTTPv2 requests.
438
439 Called when the HTTP request contains unified frame-based protocol
440 frames for evaluation.
441 """
442 # TODO Some HTTP clients are full duplex and can receive data before
443 # the entire request is transmitted. Figure out a way to indicate support
444 # for that so we can opt into full duplex mode.
445 reactor = wireprotoframing.serverreactor(deferoutput=True)
446 seencommand = False
447
448 outstream = reactor.makeoutputstream()
449
450 while True:
451 frame = wireprotoframing.readframe(req.bodyfh)
452 if not frame:
453 break
454
455 action, meta = reactor.onframerecv(frame)
456
457 if action == 'wantframe':
458 # Need more data before we can do anything.
459 continue
460 elif action == 'runcommand':
461 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
462 reqcommand, reactor, outstream,
463 meta, issubsequent=seencommand)
464
465 if sentoutput:
466 return
467
468 seencommand = True
469
470 elif action == 'error':
471 # TODO define proper error mechanism.
472 res.status = b'200 OK'
473 res.headers[b'Content-Type'] = b'text/plain'
474 res.setbodybytes(meta['message'] + b'\n')
475 return
476 else:
477 raise error.ProgrammingError(
478 'unhandled action from frame processor: %s' % action)
479
480 action, meta = reactor.oninputeof()
481 if action == 'sendframes':
482 # We assume we haven't started sending the response yet. If we're
483 # wrong, the response type will raise an exception.
484 res.status = b'200 OK'
485 res.headers[b'Content-Type'] = FRAMINGTYPE
486 res.setbodygen(meta['framegen'])
487 elif action == 'noop':
488 pass
489 else:
490 raise error.ProgrammingError('unhandled action from frame processor: %s'
491 % action)
492
493 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
494 outstream, command, issubsequent):
495 """Dispatch a wire protocol command made from HTTPv2 requests.
496
497 The authenticated permission (``authedperm``) along with the original
498 command from the URL (``reqcommand``) are passed in.
499 """
500 # We already validated that the session has permissions to perform the
501 # actions in ``authedperm``. In the unified frame protocol, the canonical
502 # command to run is expressed in a frame. However, the URL also requested
503 # to run a specific command. We need to be careful that the command we
504 # run doesn't have permissions requirements greater than what was granted
505 # by ``authedperm``.
506 #
507 # Our rule for this is we only allow one command per HTTP request and
508 # that command must match the command in the URL. However, we make
509 # an exception for the ``multirequest`` URL. This URL is allowed to
510 # execute multiple commands. We double check permissions of each command
511 # as it is invoked to ensure there is no privilege escalation.
512 # TODO consider allowing multiple commands to regular command URLs
513 # iff each command is the same.
514
515 proto = httpv2protocolhandler(req, ui, args=command['args'])
516
517 if reqcommand == b'multirequest':
518 if not wireproto.commandsv2.commandavailable(command['command'], proto):
519 # TODO proper error mechanism
520 res.status = b'200 OK'
521 res.headers[b'Content-Type'] = b'text/plain'
522 res.setbodybytes(_('wire protocol command not available: %s') %
523 command['command'])
524 return True
525
526 # TODO don't use assert here, since it may be elided by -O.
527 assert authedperm in (b'ro', b'rw')
528 wirecommand = wireproto.commandsv2[command['command']]
529 assert wirecommand.permission in ('push', 'pull')
530
531 if authedperm == b'ro' and wirecommand.permission != 'pull':
532 # TODO proper error mechanism
533 res.status = b'403 Forbidden'
534 res.headers[b'Content-Type'] = b'text/plain'
535 res.setbodybytes(_('insufficient permissions to execute '
536 'command: %s') % command['command'])
537 return True
538
539 # TODO should we also call checkperm() here? Maybe not if we're going
540 # to overhaul that API. The granted scope from the URL check should
541 # be good enough.
542
543 else:
544 # Don't allow multiple commands outside of ``multirequest`` URL.
545 if issubsequent:
546 # TODO proper error mechanism
547 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = b'text/plain'
549 res.setbodybytes(_('multiple commands cannot be issued to this '
550 'URL'))
551 return True
552
553 if reqcommand != command['command']:
554 # TODO define proper error mechanism
555 res.status = b'200 OK'
556 res.headers[b'Content-Type'] = b'text/plain'
557 res.setbodybytes(_('command in frame must match command in URL'))
558 return True
559
560 rsp = wireproto.dispatch(repo, proto, command['command'])
561
562 res.status = b'200 OK'
563 res.headers[b'Content-Type'] = FRAMINGTYPE
564
565 if isinstance(rsp, wireprototypes.bytesresponse):
566 action, meta = reactor.onbytesresponseready(outstream,
567 command['requestid'],
568 rsp.data)
569 elif isinstance(rsp, wireprototypes.cborresponse):
570 encoded = cbor.dumps(rsp.value, canonical=True)
571 action, meta = reactor.onbytesresponseready(outstream,
572 command['requestid'],
573 encoded,
574 iscbor=True)
575 else:
576 action, meta = reactor.onapplicationerror(
577 _('unhandled response type from wire proto command'))
578
579 if action == 'sendframes':
580 res.setbodygen(meta['framegen'])
581 return True
582 elif action == 'noop':
583 return False
584 else:
585 raise error.ProgrammingError('unhandled event from reactor: %s' %
586 action)
587
588 289 # Maps API name to metadata so custom API can be registered.
589 290 API_HANDLERS = {
590 HTTPV2: {
291 wireprotov2server.HTTPV2: {
591 292 'config': ('experimental', 'web.api.http-v2'),
592 'handler': _handlehttpv2request,
293 'handler': wireprotov2server.handlehttpv2request,
593 294 },
594 295 }
595 296
596 @zi.implementer(wireprototypes.baseprotocolhandler)
597 class httpv2protocolhandler(object):
598 def __init__(self, req, ui, args=None):
599 self._req = req
600 self._ui = ui
601 self._args = args
602
603 @property
604 def name(self):
605 return HTTPV2
606
607 def getargs(self, args):
608 data = {}
609 for k, typ in args.items():
610 if k == '*':
611 raise NotImplementedError('do not support * args')
612 elif k in self._args:
613 # TODO consider validating value types.
614 data[k] = self._args[k]
615
616 return data
617
618 def getprotocaps(self):
619 # Protocol capabilities are currently not implemented for HTTP V2.
620 return set()
621
622 def getpayload(self):
623 raise NotImplementedError
624
625 @contextlib.contextmanager
626 def mayberedirectstdio(self):
627 raise NotImplementedError
628
629 def client(self):
630 raise NotImplementedError
631
632 def addcapabilities(self, repo, caps):
633 return caps
634
635 def checkperm(self, perm):
636 raise NotImplementedError
637
638 297 def _httpresponsetype(ui, proto, prefer_uncompressed):
639 298 """Determine the appropriate response type and compression settings.
640 299
This diff has been collapsed as it changes many lines, (716 lines changed) Show them Hide them
@@ -7,9 +7,6 b''
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 import struct
11 import sys
12 import threading
13 10
14 11 from .i18n import _
15 12 from .thirdparty import (
@@ -19,279 +16,18 b' from .thirdparty.zope import ('
19 16 interface as zi,
20 17 )
21 18 from . import (
22 encoding,
23 19 error,
24 hook,
25 20 pycompat,
26 util,
27 21 wireproto,
28 22 wireprotoframing,
29 23 wireprototypes,
30 24 )
31 from .utils import (
32 procutil,
33 )
34 25
35 stringio = util.stringio
36
37 urlerr = util.urlerr
38 urlreq = util.urlreq
39
40 HTTP_OK = 200
41
42 HGTYPE = 'application/mercurial-0.1'
43 HGTYPE2 = 'application/mercurial-0.2'
44 HGERRTYPE = 'application/hg-error'
45 26 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
46 27
47 28 HTTPV2 = wireprototypes.HTTPV2
48 SSHV1 = wireprototypes.SSHV1
49 SSHV2 = wireprototypes.SSHV2
50 29
51 def decodevaluefromheaders(req, headerprefix):
52 """Decode a long value from multiple HTTP request headers.
53
54 Returns the value as a bytes, not a str.
55 """
56 chunks = []
57 i = 1
58 while True:
59 v = req.headers.get(b'%s-%d' % (headerprefix, i))
60 if v is None:
61 break
62 chunks.append(pycompat.bytesurl(v))
63 i += 1
64
65 return ''.join(chunks)
66
67 @zi.implementer(wireprototypes.baseprotocolhandler)
68 class httpv1protocolhandler(object):
69 def __init__(self, req, ui, checkperm):
70 self._req = req
71 self._ui = ui
72 self._checkperm = checkperm
73 self._protocaps = None
74
75 @property
76 def name(self):
77 return 'http-v1'
78
79 def getargs(self, args):
80 knownargs = self._args()
81 data = {}
82 keys = args.split()
83 for k in keys:
84 if k == '*':
85 star = {}
86 for key in knownargs.keys():
87 if key != 'cmd' and key not in keys:
88 star[key] = knownargs[key][0]
89 data['*'] = star
90 else:
91 data[k] = knownargs[k][0]
92 return [data[k] for k in keys]
93
94 def _args(self):
95 args = self._req.qsparams.asdictoflists()
96 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
97 if postlen:
98 args.update(urlreq.parseqs(
99 self._req.bodyfh.read(postlen), keep_blank_values=True))
100 return args
101
102 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
103 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
104 return args
105
106 def getprotocaps(self):
107 if self._protocaps is None:
108 value = decodevaluefromheaders(self._req, r'X-HgProto')
109 self._protocaps = set(value.split(' '))
110 return self._protocaps
111
112 def getpayload(self):
113 # Existing clients *always* send Content-Length.
114 length = int(self._req.headers[b'Content-Length'])
115
116 # If httppostargs is used, we need to read Content-Length
117 # minus the amount that was consumed by args.
118 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
119 return util.filechunkiter(self._req.bodyfh, limit=length)
120
121 @contextlib.contextmanager
122 def mayberedirectstdio(self):
123 oldout = self._ui.fout
124 olderr = self._ui.ferr
125
126 out = util.stringio()
127
128 try:
129 self._ui.fout = out
130 self._ui.ferr = out
131 yield out
132 finally:
133 self._ui.fout = oldout
134 self._ui.ferr = olderr
135
136 def client(self):
137 return 'remote:%s:%s:%s' % (
138 self._req.urlscheme,
139 urlreq.quote(self._req.remotehost or ''),
140 urlreq.quote(self._req.remoteuser or ''))
141
142 def addcapabilities(self, repo, caps):
143 caps.append(b'batch')
144
145 caps.append('httpheader=%d' %
146 repo.ui.configint('server', 'maxhttpheaderlen'))
147 if repo.ui.configbool('experimental', 'httppostargs'):
148 caps.append('httppostargs')
149
150 # FUTURE advertise 0.2rx once support is implemented
151 # FUTURE advertise minrx and mintx after consulting config option
152 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
153
154 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
155 if compengines:
156 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
157 for e in compengines)
158 caps.append('compression=%s' % comptypes)
159
160 return caps
161
162 def checkperm(self, perm):
163 return self._checkperm(perm)
164
165 # This method exists mostly so that extensions like remotefilelog can
166 # disable a kludgey legacy method only over http. As of early 2018,
167 # there are no other known users, so with any luck we can discard this
168 # hook if remotefilelog becomes a first-party extension.
169 def iscmd(cmd):
170 return cmd in wireproto.commands
171
172 def handlewsgirequest(rctx, req, res, checkperm):
173 """Possibly process a wire protocol request.
174
175 If the current request is a wire protocol request, the request is
176 processed by this function.
177
178 ``req`` is a ``parsedrequest`` instance.
179 ``res`` is a ``wsgiresponse`` instance.
180
181 Returns a bool indicating if the request was serviced. If set, the caller
182 should stop processing the request, as a response has already been issued.
183 """
184 # Avoid cycle involving hg module.
185 from .hgweb import common as hgwebcommon
186
187 repo = rctx.repo
188
189 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
190 # string parameter. If it isn't present, this isn't a wire protocol
191 # request.
192 if 'cmd' not in req.qsparams:
193 return False
194
195 cmd = req.qsparams['cmd']
196
197 # The "cmd" request parameter is used by both the wire protocol and hgweb.
198 # While not all wire protocol commands are available for all transports,
199 # if we see a "cmd" value that resembles a known wire protocol command, we
200 # route it to a protocol handler. This is better than routing possible
201 # wire protocol requests to hgweb because it prevents hgweb from using
202 # known wire protocol commands and it is less confusing for machine
203 # clients.
204 if not iscmd(cmd):
205 return False
206
207 # The "cmd" query string argument is only valid on the root path of the
208 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
209 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
210 # in this case. We send an HTTP 404 for backwards compatibility reasons.
211 if req.dispatchpath:
212 res.status = hgwebcommon.statusmessage(404)
213 res.headers['Content-Type'] = HGTYPE
214 # TODO This is not a good response to issue for this request. This
215 # is mostly for BC for now.
216 res.setbodybytes('0\n%s\n' % b'Not Found')
217 return True
218
219 proto = httpv1protocolhandler(req, repo.ui,
220 lambda perm: checkperm(rctx, req, perm))
221
222 # The permissions checker should be the only thing that can raise an
223 # ErrorResponse. It is kind of a layer violation to catch an hgweb
224 # exception here. So consider refactoring into a exception type that
225 # is associated with the wire protocol.
226 try:
227 _callhttp(repo, req, res, proto, cmd)
228 except hgwebcommon.ErrorResponse as e:
229 for k, v in e.headers:
230 res.headers[k] = v
231 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
232 # TODO This response body assumes the failed command was
233 # "unbundle." That assumption is not always valid.
234 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
235
236 return True
237
238 def handlewsgiapirequest(rctx, req, res, checkperm):
239 """Handle requests to /api/*."""
240 assert req.dispatchparts[0] == b'api'
241
242 repo = rctx.repo
243
244 # This whole URL space is experimental for now. But we want to
245 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
246 if not repo.ui.configbool('experimental', 'web.apiserver'):
247 res.status = b'404 Not Found'
248 res.headers[b'Content-Type'] = b'text/plain'
249 res.setbodybytes(_('Experimental API server endpoint not enabled'))
250 return
251
252 # The URL space is /api/<protocol>/*. The structure of URLs under varies
253 # by <protocol>.
254
255 # Registered APIs are made available via config options of the name of
256 # the protocol.
257 availableapis = set()
258 for k, v in API_HANDLERS.items():
259 section, option = v['config']
260 if repo.ui.configbool(section, option):
261 availableapis.add(k)
262
263 # Requests to /api/ list available APIs.
264 if req.dispatchparts == [b'api']:
265 res.status = b'200 OK'
266 res.headers[b'Content-Type'] = b'text/plain'
267 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
268 'one of the following:\n')]
269 if availableapis:
270 lines.extend(sorted(availableapis))
271 else:
272 lines.append(_('(no available APIs)\n'))
273 res.setbodybytes(b'\n'.join(lines))
274 return
275
276 proto = req.dispatchparts[1]
277
278 if proto not in API_HANDLERS:
279 res.status = b'404 Not Found'
280 res.headers[b'Content-Type'] = b'text/plain'
281 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
282 proto, b', '.join(sorted(availableapis))))
283 return
284
285 if proto not in availableapis:
286 res.status = b'404 Not Found'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('API %s not enabled\n') % proto)
289 return
290
291 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
292 req.dispatchparts[2:])
293
294 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
30 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
295 31 from .hgweb import common as hgwebcommon
296 32
297 33 # URL space looks like: <permissions>/<command>, where <permission> can
@@ -585,14 +321,6 b' def _httpv2runcommand(ui, repo, req, res'
585 321 raise error.ProgrammingError('unhandled event from reactor: %s' %
586 322 action)
587 323
588 # Maps API name to metadata so custom API can be registered.
589 API_HANDLERS = {
590 HTTPV2: {
591 'config': ('experimental', 'web.api.http-v2'),
592 'handler': _handlehttpv2request,
593 },
594 }
595
596 324 @zi.implementer(wireprototypes.baseprotocolhandler)
597 325 class httpv2protocolhandler(object):
598 326 def __init__(self, req, ui, args=None):
@@ -634,445 +362,3 b' class httpv2protocolhandler(object):'
634 362
635 363 def checkperm(self, perm):
636 364 raise NotImplementedError
637
638 def _httpresponsetype(ui, proto, prefer_uncompressed):
639 """Determine the appropriate response type and compression settings.
640
641 Returns a tuple of (mediatype, compengine, engineopts).
642 """
643 # Determine the response media type and compression engine based
644 # on the request parameters.
645
646 if '0.2' in proto.getprotocaps():
647 # All clients are expected to support uncompressed data.
648 if prefer_uncompressed:
649 return HGTYPE2, util._noopengine(), {}
650
651 # Now find an agreed upon compression format.
652 compformats = wireproto.clientcompressionsupport(proto)
653 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
654 if engine.wireprotosupport().name in compformats:
655 opts = {}
656 level = ui.configint('server', '%slevel' % engine.name())
657 if level is not None:
658 opts['level'] = level
659
660 return HGTYPE2, engine, opts
661
662 # No mutually supported compression format. Fall back to the
663 # legacy protocol.
664
665 # Don't allow untrusted settings because disabling compression or
666 # setting a very high compression level could lead to flooding
667 # the server's network or CPU.
668 opts = {'level': ui.configint('server', 'zliblevel')}
669 return HGTYPE, util.compengines['zlib'], opts
670
671 def _callhttp(repo, req, res, proto, cmd):
672 # Avoid cycle involving hg module.
673 from .hgweb import common as hgwebcommon
674
675 def genversion2(gen, engine, engineopts):
676 # application/mercurial-0.2 always sends a payload header
677 # identifying the compression engine.
678 name = engine.wireprotosupport().name
679 assert 0 < len(name) < 256
680 yield struct.pack('B', len(name))
681 yield name
682
683 for chunk in gen:
684 yield chunk
685
686 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
687 if code == HTTP_OK:
688 res.status = '200 Script output follows'
689 else:
690 res.status = hgwebcommon.statusmessage(code)
691
692 res.headers['Content-Type'] = contenttype
693
694 if bodybytes is not None:
695 res.setbodybytes(bodybytes)
696 if bodygen is not None:
697 res.setbodygen(bodygen)
698
699 if not wireproto.commands.commandavailable(cmd, proto):
700 setresponse(HTTP_OK, HGERRTYPE,
701 _('requested wire protocol command is not available over '
702 'HTTP'))
703 return
704
705 proto.checkperm(wireproto.commands[cmd].permission)
706
707 rsp = wireproto.dispatch(repo, proto, cmd)
708
709 if isinstance(rsp, bytes):
710 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
711 elif isinstance(rsp, wireprototypes.bytesresponse):
712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
713 elif isinstance(rsp, wireprototypes.streamreslegacy):
714 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
715 elif isinstance(rsp, wireprototypes.streamres):
716 gen = rsp.gen
717
718 # This code for compression should not be streamres specific. It
719 # is here because we only compress streamres at the moment.
720 mediatype, engine, engineopts = _httpresponsetype(
721 repo.ui, proto, rsp.prefer_uncompressed)
722 gen = engine.compressstream(gen, engineopts)
723
724 if mediatype == HGTYPE2:
725 gen = genversion2(gen, engine, engineopts)
726
727 setresponse(HTTP_OK, mediatype, bodygen=gen)
728 elif isinstance(rsp, wireprototypes.pushres):
729 rsp = '%d\n%s' % (rsp.res, rsp.output)
730 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
731 elif isinstance(rsp, wireprototypes.pusherr):
732 rsp = '0\n%s\n' % rsp.res
733 res.drain = True
734 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
735 elif isinstance(rsp, wireprototypes.ooberror):
736 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
737 else:
738 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
739
740 def _sshv1respondbytes(fout, value):
741 """Send a bytes response for protocol version 1."""
742 fout.write('%d\n' % len(value))
743 fout.write(value)
744 fout.flush()
745
746 def _sshv1respondstream(fout, source):
747 write = fout.write
748 for chunk in source.gen:
749 write(chunk)
750 fout.flush()
751
752 def _sshv1respondooberror(fout, ferr, rsp):
753 ferr.write(b'%s\n-\n' % rsp)
754 ferr.flush()
755 fout.write(b'\n')
756 fout.flush()
757
758 @zi.implementer(wireprototypes.baseprotocolhandler)
759 class sshv1protocolhandler(object):
760 """Handler for requests services via version 1 of SSH protocol."""
761 def __init__(self, ui, fin, fout):
762 self._ui = ui
763 self._fin = fin
764 self._fout = fout
765 self._protocaps = set()
766
767 @property
768 def name(self):
769 return wireprototypes.SSHV1
770
771 def getargs(self, args):
772 data = {}
773 keys = args.split()
774 for n in xrange(len(keys)):
775 argline = self._fin.readline()[:-1]
776 arg, l = argline.split()
777 if arg not in keys:
778 raise error.Abort(_("unexpected parameter %r") % arg)
779 if arg == '*':
780 star = {}
781 for k in xrange(int(l)):
782 argline = self._fin.readline()[:-1]
783 arg, l = argline.split()
784 val = self._fin.read(int(l))
785 star[arg] = val
786 data['*'] = star
787 else:
788 val = self._fin.read(int(l))
789 data[arg] = val
790 return [data[k] for k in keys]
791
792 def getprotocaps(self):
793 return self._protocaps
794
795 def getpayload(self):
796 # We initially send an empty response. This tells the client it is
797 # OK to start sending data. If a client sees any other response, it
798 # interprets it as an error.
799 _sshv1respondbytes(self._fout, b'')
800
801 # The file is in the form:
802 #
803 # <chunk size>\n<chunk>
804 # ...
805 # 0\n
806 count = int(self._fin.readline())
807 while count:
808 yield self._fin.read(count)
809 count = int(self._fin.readline())
810
811 @contextlib.contextmanager
812 def mayberedirectstdio(self):
813 yield None
814
815 def client(self):
816 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
817 return 'remote:ssh:' + client
818
819 def addcapabilities(self, repo, caps):
820 if self.name == wireprototypes.SSHV1:
821 caps.append(b'protocaps')
822 caps.append(b'batch')
823 return caps
824
825 def checkperm(self, perm):
826 pass
827
828 class sshv2protocolhandler(sshv1protocolhandler):
829 """Protocol handler for version 2 of the SSH protocol."""
830
831 @property
832 def name(self):
833 return wireprototypes.SSHV2
834
835 def addcapabilities(self, repo, caps):
836 return caps
837
838 def _runsshserver(ui, repo, fin, fout, ev):
839 # This function operates like a state machine of sorts. The following
840 # states are defined:
841 #
842 # protov1-serving
843 # Server is in protocol version 1 serving mode. Commands arrive on
844 # new lines. These commands are processed in this state, one command
845 # after the other.
846 #
847 # protov2-serving
848 # Server is in protocol version 2 serving mode.
849 #
850 # upgrade-initial
851 # The server is going to process an upgrade request.
852 #
853 # upgrade-v2-filter-legacy-handshake
854 # The protocol is being upgraded to version 2. The server is expecting
855 # the legacy handshake from version 1.
856 #
857 # upgrade-v2-finish
858 # The upgrade to version 2 of the protocol is imminent.
859 #
860 # shutdown
861 # The server is shutting down, possibly in reaction to a client event.
862 #
863 # And here are their transitions:
864 #
865 # protov1-serving -> shutdown
866 # When server receives an empty request or encounters another
867 # error.
868 #
869 # protov1-serving -> upgrade-initial
870 # An upgrade request line was seen.
871 #
872 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
873 # Upgrade to version 2 in progress. Server is expecting to
874 # process a legacy handshake.
875 #
876 # upgrade-v2-filter-legacy-handshake -> shutdown
877 # Client did not fulfill upgrade handshake requirements.
878 #
879 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
880 # Client fulfilled version 2 upgrade requirements. Finishing that
881 # upgrade.
882 #
883 # upgrade-v2-finish -> protov2-serving
884 # Protocol upgrade to version 2 complete. Server can now speak protocol
885 # version 2.
886 #
887 # protov2-serving -> protov1-serving
888 # Ths happens by default since protocol version 2 is the same as
889 # version 1 except for the handshake.
890
891 state = 'protov1-serving'
892 proto = sshv1protocolhandler(ui, fin, fout)
893 protoswitched = False
894
895 while not ev.is_set():
896 if state == 'protov1-serving':
897 # Commands are issued on new lines.
898 request = fin.readline()[:-1]
899
900 # Empty lines signal to terminate the connection.
901 if not request:
902 state = 'shutdown'
903 continue
904
905 # It looks like a protocol upgrade request. Transition state to
906 # handle it.
907 if request.startswith(b'upgrade '):
908 if protoswitched:
909 _sshv1respondooberror(fout, ui.ferr,
910 b'cannot upgrade protocols multiple '
911 b'times')
912 state = 'shutdown'
913 continue
914
915 state = 'upgrade-initial'
916 continue
917
918 available = wireproto.commands.commandavailable(request, proto)
919
920 # This command isn't available. Send an empty response and go
921 # back to waiting for a new command.
922 if not available:
923 _sshv1respondbytes(fout, b'')
924 continue
925
926 rsp = wireproto.dispatch(repo, proto, request)
927
928 if isinstance(rsp, bytes):
929 _sshv1respondbytes(fout, rsp)
930 elif isinstance(rsp, wireprototypes.bytesresponse):
931 _sshv1respondbytes(fout, rsp.data)
932 elif isinstance(rsp, wireprototypes.streamres):
933 _sshv1respondstream(fout, rsp)
934 elif isinstance(rsp, wireprototypes.streamreslegacy):
935 _sshv1respondstream(fout, rsp)
936 elif isinstance(rsp, wireprototypes.pushres):
937 _sshv1respondbytes(fout, b'')
938 _sshv1respondbytes(fout, b'%d' % rsp.res)
939 elif isinstance(rsp, wireprototypes.pusherr):
940 _sshv1respondbytes(fout, rsp.res)
941 elif isinstance(rsp, wireprototypes.ooberror):
942 _sshv1respondooberror(fout, ui.ferr, rsp.message)
943 else:
944 raise error.ProgrammingError('unhandled response type from '
945 'wire protocol command: %s' % rsp)
946
947 # For now, protocol version 2 serving just goes back to version 1.
948 elif state == 'protov2-serving':
949 state = 'protov1-serving'
950 continue
951
952 elif state == 'upgrade-initial':
953 # We should never transition into this state if we've switched
954 # protocols.
955 assert not protoswitched
956 assert proto.name == wireprototypes.SSHV1
957
958 # Expected: upgrade <token> <capabilities>
959 # If we get something else, the request is malformed. It could be
960 # from a future client that has altered the upgrade line content.
961 # We treat this as an unknown command.
962 try:
963 token, caps = request.split(b' ')[1:]
964 except ValueError:
965 _sshv1respondbytes(fout, b'')
966 state = 'protov1-serving'
967 continue
968
969 # Send empty response if we don't support upgrading protocols.
970 if not ui.configbool('experimental', 'sshserver.support-v2'):
971 _sshv1respondbytes(fout, b'')
972 state = 'protov1-serving'
973 continue
974
975 try:
976 caps = urlreq.parseqs(caps)
977 except ValueError:
978 _sshv1respondbytes(fout, b'')
979 state = 'protov1-serving'
980 continue
981
982 # We don't see an upgrade request to protocol version 2. Ignore
983 # the upgrade request.
984 wantedprotos = caps.get(b'proto', [b''])[0]
985 if SSHV2 not in wantedprotos:
986 _sshv1respondbytes(fout, b'')
987 state = 'protov1-serving'
988 continue
989
990 # It looks like we can honor this upgrade request to protocol 2.
991 # Filter the rest of the handshake protocol request lines.
992 state = 'upgrade-v2-filter-legacy-handshake'
993 continue
994
995 elif state == 'upgrade-v2-filter-legacy-handshake':
996 # Client should have sent legacy handshake after an ``upgrade``
997 # request. Expected lines:
998 #
999 # hello
1000 # between
1001 # pairs 81
1002 # 0000...-0000...
1003
1004 ok = True
1005 for line in (b'hello', b'between', b'pairs 81'):
1006 request = fin.readline()[:-1]
1007
1008 if request != line:
1009 _sshv1respondooberror(fout, ui.ferr,
1010 b'malformed handshake protocol: '
1011 b'missing %s' % line)
1012 ok = False
1013 state = 'shutdown'
1014 break
1015
1016 if not ok:
1017 continue
1018
1019 request = fin.read(81)
1020 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
1021 _sshv1respondooberror(fout, ui.ferr,
1022 b'malformed handshake protocol: '
1023 b'missing between argument value')
1024 state = 'shutdown'
1025 continue
1026
1027 state = 'upgrade-v2-finish'
1028 continue
1029
1030 elif state == 'upgrade-v2-finish':
1031 # Send the upgrade response.
1032 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1033 servercaps = wireproto.capabilities(repo, proto)
1034 rsp = b'capabilities: %s' % servercaps.data
1035 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1036 fout.flush()
1037
1038 proto = sshv2protocolhandler(ui, fin, fout)
1039 protoswitched = True
1040
1041 state = 'protov2-serving'
1042 continue
1043
1044 elif state == 'shutdown':
1045 break
1046
1047 else:
1048 raise error.ProgrammingError('unhandled ssh server state: %s' %
1049 state)
1050
1051 class sshserver(object):
1052 def __init__(self, ui, repo, logfh=None):
1053 self._ui = ui
1054 self._repo = repo
1055 self._fin = ui.fin
1056 self._fout = ui.fout
1057
1058 # Log write I/O to stdout and stderr if configured.
1059 if logfh:
1060 self._fout = util.makeloggingfileobject(
1061 logfh, self._fout, 'o', logdata=True)
1062 ui.ferr = util.makeloggingfileobject(
1063 logfh, ui.ferr, 'e', logdata=True)
1064
1065 hook.redirect(True)
1066 ui.fout = repo.ui.fout = ui.ferr
1067
1068 # Prevent insertion/deletion of CRs
1069 procutil.setbinary(self._fin)
1070 procutil.setbinary(self._fout)
1071
1072 def serve_forever(self):
1073 self.serveuntil(threading.Event())
1074 sys.exit(0)
1075
1076 def serveuntil(self, ev):
1077 """Serve until a threading.Event is set."""
1078 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -23,6 +23,7 b' from mercurial import ('
23 23 vfs as vfsmod,
24 24 wireprotoserver,
25 25 wireprototypes,
26 wireprotov2server,
26 27 )
27 28
28 29 rootdir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
@@ -125,7 +126,7 b' def main():'
125 126 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
126 127 wireprotoserver.httpv1protocolhandler)
127 128 ziverify.verifyClass(wireprototypes.baseprotocolhandler,
128 wireprotoserver.httpv2protocolhandler)
129 wireprotov2server.httpv2protocolhandler)
129 130
130 131 sshv1 = wireprotoserver.sshv1protocolhandler(None, None, None)
131 132 checkzobject(sshv1)
@@ -134,7 +135,7 b' def main():'
134 135
135 136 httpv1 = wireprotoserver.httpv1protocolhandler(None, None, None)
136 137 checkzobject(httpv1)
137 httpv2 = wireprotoserver.httpv2protocolhandler(None, None)
138 httpv2 = wireprotov2server.httpv2protocolhandler(None, None)
138 139 checkzobject(httpv2)
139 140
140 141 ziverify.verifyClass(repository.ifilestorage, filelog.filelog)
General Comments 0
You need to be logged in to leave comments. Login now