##// END OF EJS Templates
wireprotoserver: rename webproto to httpv1protocolhandler...
Gregory Szorc -
r36240:6ba5b03f default
parent child Browse files
Show More
@@ -1,644 +1,644
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import contextlib
11 11 import struct
12 12 import sys
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 wireprototypes,
23 23 )
24 24
25 25 stringio = util.stringio
26 26
27 27 urlerr = util.urlerr
28 28 urlreq = util.urlreq
29 29
30 30 HTTP_OK = 200
31 31
32 32 HGTYPE = 'application/mercurial-0.1'
33 33 HGTYPE2 = 'application/mercurial-0.2'
34 34 HGERRTYPE = 'application/hg-error'
35 35
36 36 # Names of the SSH protocol implementations.
37 37 SSHV1 = 'ssh-v1'
38 38 # This is advertised over the wire. Incremental the counter at the end
39 39 # to reflect BC breakages.
40 40 SSHV2 = 'exp-ssh-v2-0001'
41 41
42 42 class baseprotocolhandler(object):
43 43 """Abstract base class for wire protocol handlers.
44 44
45 45 A wire protocol handler serves as an interface between protocol command
46 46 handlers and the wire protocol transport layer. Protocol handlers provide
47 47 methods to read command arguments, redirect stdio for the duration of
48 48 the request, handle response types, etc.
49 49 """
50 50
51 51 __metaclass__ = abc.ABCMeta
52 52
53 53 @abc.abstractproperty
54 54 def name(self):
55 55 """The name of the protocol implementation.
56 56
57 57 Used for uniquely identifying the transport type.
58 58 """
59 59
60 60 @abc.abstractmethod
61 61 def getargs(self, args):
62 62 """return the value for arguments in <args>
63 63
64 64 returns a list of values (same order as <args>)"""
65 65
66 66 @abc.abstractmethod
67 67 def forwardpayload(self, fp):
68 68 """Read the raw payload and forward to a file.
69 69
70 70 The payload is read in full before the function returns.
71 71 """
72 72
73 73 @abc.abstractmethod
74 74 def mayberedirectstdio(self):
75 75 """Context manager to possibly redirect stdio.
76 76
77 77 The context manager yields a file-object like object that receives
78 78 stdout and stderr output when the context manager is active. Or it
79 79 yields ``None`` if no I/O redirection occurs.
80 80
81 81 The intent of this context manager is to capture stdio output
82 82 so it may be sent in the response. Some transports support streaming
83 83 stdio to the client in real time. For these transports, stdio output
84 84 won't be captured.
85 85 """
86 86
87 87 @abc.abstractmethod
88 88 def client(self):
89 89 """Returns a string representation of this client (as bytes)."""
90 90
91 91 def decodevaluefromheaders(req, headerprefix):
92 92 """Decode a long value from multiple HTTP request headers.
93 93
94 94 Returns the value as a bytes, not a str.
95 95 """
96 96 chunks = []
97 97 i = 1
98 98 prefix = headerprefix.upper().replace(r'-', r'_')
99 99 while True:
100 100 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
101 101 if v is None:
102 102 break
103 103 chunks.append(pycompat.bytesurl(v))
104 104 i += 1
105 105
106 106 return ''.join(chunks)
107 107
108 class webproto(baseprotocolhandler):
108 class httpv1protocolhandler(baseprotocolhandler):
109 109 def __init__(self, req, ui):
110 110 self._req = req
111 111 self._ui = ui
112 112
113 113 @property
114 114 def name(self):
115 115 return 'http'
116 116
117 117 def getargs(self, args):
118 118 knownargs = self._args()
119 119 data = {}
120 120 keys = args.split()
121 121 for k in keys:
122 122 if k == '*':
123 123 star = {}
124 124 for key in knownargs.keys():
125 125 if key != 'cmd' and key not in keys:
126 126 star[key] = knownargs[key][0]
127 127 data['*'] = star
128 128 else:
129 129 data[k] = knownargs[k][0]
130 130 return [data[k] for k in keys]
131 131
132 132 def _args(self):
133 133 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
134 134 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
135 135 if postlen:
136 136 args.update(urlreq.parseqs(
137 137 self._req.read(postlen), keep_blank_values=True))
138 138 return args
139 139
140 140 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
141 141 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
142 142 return args
143 143
144 144 def forwardpayload(self, fp):
145 145 length = int(self._req.env[r'CONTENT_LENGTH'])
146 146 # If httppostargs is used, we need to read Content-Length
147 147 # minus the amount that was consumed by args.
148 148 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
149 149 for s in util.filechunkiter(self._req, limit=length):
150 150 fp.write(s)
151 151
152 152 @contextlib.contextmanager
153 153 def mayberedirectstdio(self):
154 154 oldout = self._ui.fout
155 155 olderr = self._ui.ferr
156 156
157 157 out = util.stringio()
158 158
159 159 try:
160 160 self._ui.fout = out
161 161 self._ui.ferr = out
162 162 yield out
163 163 finally:
164 164 self._ui.fout = oldout
165 165 self._ui.ferr = olderr
166 166
167 167 def client(self):
168 168 return 'remote:%s:%s:%s' % (
169 169 self._req.env.get('wsgi.url_scheme') or 'http',
170 170 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
171 171 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
172 172
173 173 def iscmd(cmd):
174 174 return cmd in wireproto.commands
175 175
176 176 def parsehttprequest(repo, req, query):
177 177 """Parse the HTTP request for a wire protocol request.
178 178
179 179 If the current request appears to be a wire protocol request, this
180 180 function returns a dict with details about that request, including
181 181 an ``abstractprotocolserver`` instance suitable for handling the
182 182 request. Otherwise, ``None`` is returned.
183 183
184 184 ``req`` is a ``wsgirequest`` instance.
185 185 """
186 186 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
187 187 # string parameter. If it isn't present, this isn't a wire protocol
188 188 # request.
189 189 if r'cmd' not in req.form:
190 190 return None
191 191
192 192 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
193 193
194 194 # The "cmd" request parameter is used by both the wire protocol and hgweb.
195 195 # While not all wire protocol commands are available for all transports,
196 196 # if we see a "cmd" value that resembles a known wire protocol command, we
197 197 # route it to a protocol handler. This is better than routing possible
198 198 # wire protocol requests to hgweb because it prevents hgweb from using
199 199 # known wire protocol commands and it is less confusing for machine
200 200 # clients.
201 201 if cmd not in wireproto.commands:
202 202 return None
203 203
204 proto = webproto(req, repo.ui)
204 proto = httpv1protocolhandler(req, repo.ui)
205 205
206 206 return {
207 207 'cmd': cmd,
208 208 'proto': proto,
209 209 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
210 210 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
211 211 }
212 212
213 213 def _httpresponsetype(ui, req, prefer_uncompressed):
214 214 """Determine the appropriate response type and compression settings.
215 215
216 216 Returns a tuple of (mediatype, compengine, engineopts).
217 217 """
218 218 # Determine the response media type and compression engine based
219 219 # on the request parameters.
220 220 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
221 221
222 222 if '0.2' in protocaps:
223 223 # All clients are expected to support uncompressed data.
224 224 if prefer_uncompressed:
225 225 return HGTYPE2, util._noopengine(), {}
226 226
227 227 # Default as defined by wire protocol spec.
228 228 compformats = ['zlib', 'none']
229 229 for cap in protocaps:
230 230 if cap.startswith('comp='):
231 231 compformats = cap[5:].split(',')
232 232 break
233 233
234 234 # Now find an agreed upon compression format.
235 235 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
236 236 if engine.wireprotosupport().name in compformats:
237 237 opts = {}
238 238 level = ui.configint('server', '%slevel' % engine.name())
239 239 if level is not None:
240 240 opts['level'] = level
241 241
242 242 return HGTYPE2, engine, opts
243 243
244 244 # No mutually supported compression format. Fall back to the
245 245 # legacy protocol.
246 246
247 247 # Don't allow untrusted settings because disabling compression or
248 248 # setting a very high compression level could lead to flooding
249 249 # the server's network or CPU.
250 250 opts = {'level': ui.configint('server', 'zliblevel')}
251 251 return HGTYPE, util.compengines['zlib'], opts
252 252
253 253 def _callhttp(repo, req, proto, cmd):
254 254 def genversion2(gen, engine, engineopts):
255 255 # application/mercurial-0.2 always sends a payload header
256 256 # identifying the compression engine.
257 257 name = engine.wireprotosupport().name
258 258 assert 0 < len(name) < 256
259 259 yield struct.pack('B', len(name))
260 260 yield name
261 261
262 262 for chunk in gen:
263 263 yield chunk
264 264
265 265 rsp = wireproto.dispatch(repo, proto, cmd)
266 266
267 267 if not wireproto.commands.commandavailable(cmd, proto):
268 268 req.respond(HTTP_OK, HGERRTYPE,
269 269 body=_('requested wire protocol command is not available '
270 270 'over HTTP'))
271 271 return []
272 272
273 273 if isinstance(rsp, bytes):
274 274 req.respond(HTTP_OK, HGTYPE, body=rsp)
275 275 return []
276 276 elif isinstance(rsp, wireprototypes.bytesresponse):
277 277 req.respond(HTTP_OK, HGTYPE, body=rsp.data)
278 278 return []
279 279 elif isinstance(rsp, wireprototypes.streamreslegacy):
280 280 gen = rsp.gen
281 281 req.respond(HTTP_OK, HGTYPE)
282 282 return gen
283 283 elif isinstance(rsp, wireprototypes.streamres):
284 284 gen = rsp.gen
285 285
286 286 # This code for compression should not be streamres specific. It
287 287 # is here because we only compress streamres at the moment.
288 288 mediatype, engine, engineopts = _httpresponsetype(
289 289 repo.ui, req, rsp.prefer_uncompressed)
290 290 gen = engine.compressstream(gen, engineopts)
291 291
292 292 if mediatype == HGTYPE2:
293 293 gen = genversion2(gen, engine, engineopts)
294 294
295 295 req.respond(HTTP_OK, mediatype)
296 296 return gen
297 297 elif isinstance(rsp, wireprototypes.pushres):
298 298 rsp = '%d\n%s' % (rsp.res, rsp.output)
299 299 req.respond(HTTP_OK, HGTYPE, body=rsp)
300 300 return []
301 301 elif isinstance(rsp, wireprototypes.pusherr):
302 302 # This is the httplib workaround documented in _handlehttperror().
303 303 req.drain()
304 304
305 305 rsp = '0\n%s\n' % rsp.res
306 306 req.respond(HTTP_OK, HGTYPE, body=rsp)
307 307 return []
308 308 elif isinstance(rsp, wireprototypes.ooberror):
309 309 rsp = rsp.message
310 310 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
311 311 return []
312 312 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
313 313
314 314 def _handlehttperror(e, req, cmd):
315 315 """Called when an ErrorResponse is raised during HTTP request processing."""
316 316
317 317 # Clients using Python's httplib are stateful: the HTTP client
318 318 # won't process an HTTP response until all request data is
319 319 # sent to the server. The intent of this code is to ensure
320 320 # we always read HTTP request data from the client, thus
321 321 # ensuring httplib transitions to a state that allows it to read
322 322 # the HTTP response. In other words, it helps prevent deadlocks
323 323 # on clients using httplib.
324 324
325 325 if (req.env[r'REQUEST_METHOD'] == r'POST' and
326 326 # But not if Expect: 100-continue is being used.
327 327 (req.env.get('HTTP_EXPECT',
328 328 '').lower() != '100-continue') or
329 329 # Or the non-httplib HTTP library is being advertised by
330 330 # the client.
331 331 req.env.get('X-HgHttp2', '')):
332 332 req.drain()
333 333 else:
334 334 req.headers.append((r'Connection', r'Close'))
335 335
336 336 # TODO This response body assumes the failed command was
337 337 # "unbundle." That assumption is not always valid.
338 338 req.respond(e, HGTYPE, body='0\n%s\n' % e)
339 339
340 340 return ''
341 341
342 342 def _sshv1respondbytes(fout, value):
343 343 """Send a bytes response for protocol version 1."""
344 344 fout.write('%d\n' % len(value))
345 345 fout.write(value)
346 346 fout.flush()
347 347
348 348 def _sshv1respondstream(fout, source):
349 349 write = fout.write
350 350 for chunk in source.gen:
351 351 write(chunk)
352 352 fout.flush()
353 353
354 354 def _sshv1respondooberror(fout, ferr, rsp):
355 355 ferr.write(b'%s\n-\n' % rsp)
356 356 ferr.flush()
357 357 fout.write(b'\n')
358 358 fout.flush()
359 359
360 360 class sshv1protocolhandler(baseprotocolhandler):
361 361 """Handler for requests services via version 1 of SSH protocol."""
362 362 def __init__(self, ui, fin, fout):
363 363 self._ui = ui
364 364 self._fin = fin
365 365 self._fout = fout
366 366
367 367 @property
368 368 def name(self):
369 369 return SSHV1
370 370
371 371 def getargs(self, args):
372 372 data = {}
373 373 keys = args.split()
374 374 for n in xrange(len(keys)):
375 375 argline = self._fin.readline()[:-1]
376 376 arg, l = argline.split()
377 377 if arg not in keys:
378 378 raise error.Abort(_("unexpected parameter %r") % arg)
379 379 if arg == '*':
380 380 star = {}
381 381 for k in xrange(int(l)):
382 382 argline = self._fin.readline()[:-1]
383 383 arg, l = argline.split()
384 384 val = self._fin.read(int(l))
385 385 star[arg] = val
386 386 data['*'] = star
387 387 else:
388 388 val = self._fin.read(int(l))
389 389 data[arg] = val
390 390 return [data[k] for k in keys]
391 391
392 392 def forwardpayload(self, fpout):
393 393 # The file is in the form:
394 394 #
395 395 # <chunk size>\n<chunk>
396 396 # ...
397 397 # 0\n
398 398 _sshv1respondbytes(self._fout, b'')
399 399 count = int(self._fin.readline())
400 400 while count:
401 401 fpout.write(self._fin.read(count))
402 402 count = int(self._fin.readline())
403 403
404 404 @contextlib.contextmanager
405 405 def mayberedirectstdio(self):
406 406 yield None
407 407
408 408 def client(self):
409 409 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
410 410 return 'remote:ssh:' + client
411 411
412 412 class sshv2protocolhandler(sshv1protocolhandler):
413 413 """Protocol handler for version 2 of the SSH protocol."""
414 414
415 415 def _runsshserver(ui, repo, fin, fout):
416 416 # This function operates like a state machine of sorts. The following
417 417 # states are defined:
418 418 #
419 419 # protov1-serving
420 420 # Server is in protocol version 1 serving mode. Commands arrive on
421 421 # new lines. These commands are processed in this state, one command
422 422 # after the other.
423 423 #
424 424 # protov2-serving
425 425 # Server is in protocol version 2 serving mode.
426 426 #
427 427 # upgrade-initial
428 428 # The server is going to process an upgrade request.
429 429 #
430 430 # upgrade-v2-filter-legacy-handshake
431 431 # The protocol is being upgraded to version 2. The server is expecting
432 432 # the legacy handshake from version 1.
433 433 #
434 434 # upgrade-v2-finish
435 435 # The upgrade to version 2 of the protocol is imminent.
436 436 #
437 437 # shutdown
438 438 # The server is shutting down, possibly in reaction to a client event.
439 439 #
440 440 # And here are their transitions:
441 441 #
442 442 # protov1-serving -> shutdown
443 443 # When server receives an empty request or encounters another
444 444 # error.
445 445 #
446 446 # protov1-serving -> upgrade-initial
447 447 # An upgrade request line was seen.
448 448 #
449 449 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
450 450 # Upgrade to version 2 in progress. Server is expecting to
451 451 # process a legacy handshake.
452 452 #
453 453 # upgrade-v2-filter-legacy-handshake -> shutdown
454 454 # Client did not fulfill upgrade handshake requirements.
455 455 #
456 456 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
457 457 # Client fulfilled version 2 upgrade requirements. Finishing that
458 458 # upgrade.
459 459 #
460 460 # upgrade-v2-finish -> protov2-serving
461 461 # Protocol upgrade to version 2 complete. Server can now speak protocol
462 462 # version 2.
463 463 #
464 464 # protov2-serving -> protov1-serving
465 465 # Ths happens by default since protocol version 2 is the same as
466 466 # version 1 except for the handshake.
467 467
468 468 state = 'protov1-serving'
469 469 proto = sshv1protocolhandler(ui, fin, fout)
470 470 protoswitched = False
471 471
472 472 while True:
473 473 if state == 'protov1-serving':
474 474 # Commands are issued on new lines.
475 475 request = fin.readline()[:-1]
476 476
477 477 # Empty lines signal to terminate the connection.
478 478 if not request:
479 479 state = 'shutdown'
480 480 continue
481 481
482 482 # It looks like a protocol upgrade request. Transition state to
483 483 # handle it.
484 484 if request.startswith(b'upgrade '):
485 485 if protoswitched:
486 486 _sshv1respondooberror(fout, ui.ferr,
487 487 b'cannot upgrade protocols multiple '
488 488 b'times')
489 489 state = 'shutdown'
490 490 continue
491 491
492 492 state = 'upgrade-initial'
493 493 continue
494 494
495 495 available = wireproto.commands.commandavailable(request, proto)
496 496
497 497 # This command isn't available. Send an empty response and go
498 498 # back to waiting for a new command.
499 499 if not available:
500 500 _sshv1respondbytes(fout, b'')
501 501 continue
502 502
503 503 rsp = wireproto.dispatch(repo, proto, request)
504 504
505 505 if isinstance(rsp, bytes):
506 506 _sshv1respondbytes(fout, rsp)
507 507 elif isinstance(rsp, wireprototypes.bytesresponse):
508 508 _sshv1respondbytes(fout, rsp.data)
509 509 elif isinstance(rsp, wireprototypes.streamres):
510 510 _sshv1respondstream(fout, rsp)
511 511 elif isinstance(rsp, wireprototypes.streamreslegacy):
512 512 _sshv1respondstream(fout, rsp)
513 513 elif isinstance(rsp, wireprototypes.pushres):
514 514 _sshv1respondbytes(fout, b'')
515 515 _sshv1respondbytes(fout, b'%d' % rsp.res)
516 516 elif isinstance(rsp, wireprototypes.pusherr):
517 517 _sshv1respondbytes(fout, rsp.res)
518 518 elif isinstance(rsp, wireprototypes.ooberror):
519 519 _sshv1respondooberror(fout, ui.ferr, rsp.message)
520 520 else:
521 521 raise error.ProgrammingError('unhandled response type from '
522 522 'wire protocol command: %s' % rsp)
523 523
524 524 # For now, protocol version 2 serving just goes back to version 1.
525 525 elif state == 'protov2-serving':
526 526 state = 'protov1-serving'
527 527 continue
528 528
529 529 elif state == 'upgrade-initial':
530 530 # We should never transition into this state if we've switched
531 531 # protocols.
532 532 assert not protoswitched
533 533 assert proto.name == SSHV1
534 534
535 535 # Expected: upgrade <token> <capabilities>
536 536 # If we get something else, the request is malformed. It could be
537 537 # from a future client that has altered the upgrade line content.
538 538 # We treat this as an unknown command.
539 539 try:
540 540 token, caps = request.split(b' ')[1:]
541 541 except ValueError:
542 542 _sshv1respondbytes(fout, b'')
543 543 state = 'protov1-serving'
544 544 continue
545 545
546 546 # Send empty response if we don't support upgrading protocols.
547 547 if not ui.configbool('experimental', 'sshserver.support-v2'):
548 548 _sshv1respondbytes(fout, b'')
549 549 state = 'protov1-serving'
550 550 continue
551 551
552 552 try:
553 553 caps = urlreq.parseqs(caps)
554 554 except ValueError:
555 555 _sshv1respondbytes(fout, b'')
556 556 state = 'protov1-serving'
557 557 continue
558 558
559 559 # We don't see an upgrade request to protocol version 2. Ignore
560 560 # the upgrade request.
561 561 wantedprotos = caps.get(b'proto', [b''])[0]
562 562 if SSHV2 not in wantedprotos:
563 563 _sshv1respondbytes(fout, b'')
564 564 state = 'protov1-serving'
565 565 continue
566 566
567 567 # It looks like we can honor this upgrade request to protocol 2.
568 568 # Filter the rest of the handshake protocol request lines.
569 569 state = 'upgrade-v2-filter-legacy-handshake'
570 570 continue
571 571
572 572 elif state == 'upgrade-v2-filter-legacy-handshake':
573 573 # Client should have sent legacy handshake after an ``upgrade``
574 574 # request. Expected lines:
575 575 #
576 576 # hello
577 577 # between
578 578 # pairs 81
579 579 # 0000...-0000...
580 580
581 581 ok = True
582 582 for line in (b'hello', b'between', b'pairs 81'):
583 583 request = fin.readline()[:-1]
584 584
585 585 if request != line:
586 586 _sshv1respondooberror(fout, ui.ferr,
587 587 b'malformed handshake protocol: '
588 588 b'missing %s' % line)
589 589 ok = False
590 590 state = 'shutdown'
591 591 break
592 592
593 593 if not ok:
594 594 continue
595 595
596 596 request = fin.read(81)
597 597 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
598 598 _sshv1respondooberror(fout, ui.ferr,
599 599 b'malformed handshake protocol: '
600 600 b'missing between argument value')
601 601 state = 'shutdown'
602 602 continue
603 603
604 604 state = 'upgrade-v2-finish'
605 605 continue
606 606
607 607 elif state == 'upgrade-v2-finish':
608 608 # Send the upgrade response.
609 609 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
610 610 servercaps = wireproto.capabilities(repo, proto)
611 611 rsp = b'capabilities: %s' % servercaps.data
612 612 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
613 613 fout.flush()
614 614
615 615 proto = sshv2protocolhandler(ui, fin, fout)
616 616 protoswitched = True
617 617
618 618 state = 'protov2-serving'
619 619 continue
620 620
621 621 elif state == 'shutdown':
622 622 break
623 623
624 624 else:
625 625 raise error.ProgrammingError('unhandled ssh server state: %s' %
626 626 state)
627 627
628 628 class sshserver(object):
629 629 def __init__(self, ui, repo):
630 630 self._ui = ui
631 631 self._repo = repo
632 632 self._fin = ui.fin
633 633 self._fout = ui.fout
634 634
635 635 hook.redirect(True)
636 636 ui.fout = repo.ui.fout = ui.ferr
637 637
638 638 # Prevent insertion/deletion of CRs
639 639 util.setbinary(self._fin)
640 640 util.setbinary(self._fout)
641 641
642 642 def serve_forever(self):
643 643 _runsshserver(self._ui, self._repo, self._fin, self._fout)
644 644 sys.exit(0)
General Comments 0
You need to be logged in to leave comments. Login now