##// END OF EJS Templates
sshpeer: check pipe validity before forwarding output from it...
Matt Harbison -
r36851:46f4d71e default
parent child Browse files
Show More
@@ -1,612 +1,613 b''
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11 import uuid
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 error,
16 16 pycompat,
17 17 util,
18 18 wireproto,
19 19 wireprotoserver,
20 20 wireprototypes,
21 21 )
22 22
23 23 def _serverquote(s):
24 24 """quote a string for the remote shell ... which we assume is sh"""
25 25 if not s:
26 26 return s
27 27 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
28 28 return s
29 29 return "'%s'" % s.replace("'", "'\\''")
30 30
31 31 def _forwardoutput(ui, pipe):
32 32 """display all data currently available on pipe as remote output.
33 33
34 34 This is non blocking."""
35 s = util.readpipe(pipe)
36 if s:
37 for l in s.splitlines():
38 ui.status(_("remote: "), l, '\n')
35 if pipe:
36 s = util.readpipe(pipe)
37 if s:
38 for l in s.splitlines():
39 ui.status(_("remote: "), l, '\n')
39 40
40 41 class doublepipe(object):
41 42 """Operate a side-channel pipe in addition of a main one
42 43
43 44 The side-channel pipe contains server output to be forwarded to the user
44 45 input. The double pipe will behave as the "main" pipe, but will ensure the
45 46 content of the "side" pipe is properly processed while we wait for blocking
46 47 call on the "main" pipe.
47 48
48 49 If large amounts of data are read from "main", the forward will cease after
49 50 the first bytes start to appear. This simplifies the implementation
50 51 without affecting actual output of sshpeer too much as we rarely issue
51 52 large read for data not yet emitted by the server.
52 53
53 54 The main pipe is expected to be a 'bufferedinputpipe' from the util module
54 55 that handle all the os specific bits. This class lives in this module
55 56 because it focus on behavior specific to the ssh protocol."""
56 57
57 58 def __init__(self, ui, main, side):
58 59 self._ui = ui
59 60 self._main = main
60 61 self._side = side
61 62
62 63 def _wait(self):
63 64 """wait until some data are available on main or side
64 65
65 66 return a pair of boolean (ismainready, issideready)
66 67
67 68 (This will only wait for data if the setup is supported by `util.poll`)
68 69 """
69 70 if (isinstance(self._main, util.bufferedinputpipe) and
70 71 self._main.hasbuffer):
71 72 # Main has data. Assume side is worth poking at.
72 73 return True, True
73 74
74 75 fds = [self._main.fileno(), self._side.fileno()]
75 76 try:
76 77 act = util.poll(fds)
77 78 except NotImplementedError:
78 79 # non supported yet case, assume all have data.
79 80 act = fds
80 81 return (self._main.fileno() in act, self._side.fileno() in act)
81 82
82 83 def write(self, data):
83 84 return self._call('write', data)
84 85
85 86 def read(self, size):
86 87 r = self._call('read', size)
87 88 if size != 0 and not r:
88 89 # We've observed a condition that indicates the
89 90 # stdout closed unexpectedly. Check stderr one
90 91 # more time and snag anything that's there before
91 92 # letting anyone know the main part of the pipe
92 93 # closed prematurely.
93 94 _forwardoutput(self._ui, self._side)
94 95 return r
95 96
96 97 def readline(self):
97 98 return self._call('readline')
98 99
99 100 def _call(self, methname, data=None):
100 101 """call <methname> on "main", forward output of "side" while blocking
101 102 """
102 103 # data can be '' or 0
103 104 if (data is not None and not data) or self._main.closed:
104 105 _forwardoutput(self._ui, self._side)
105 106 return ''
106 107 while True:
107 108 mainready, sideready = self._wait()
108 109 if sideready:
109 110 _forwardoutput(self._ui, self._side)
110 111 if mainready:
111 112 meth = getattr(self._main, methname)
112 113 if data is None:
113 114 return meth()
114 115 else:
115 116 return meth(data)
116 117
117 118 def close(self):
118 119 return self._main.close()
119 120
120 121 def flush(self):
121 122 return self._main.flush()
122 123
123 124 def _cleanuppipes(ui, pipei, pipeo, pipee):
124 125 """Clean up pipes used by an SSH connection."""
125 126 if pipeo:
126 127 pipeo.close()
127 128 if pipei:
128 129 pipei.close()
129 130
130 131 if pipee:
131 132 # Try to read from the err descriptor until EOF.
132 133 try:
133 134 for l in pipee:
134 135 ui.status(_('remote: '), l)
135 136 except (IOError, ValueError):
136 137 pass
137 138
138 139 pipee.close()
139 140
140 141 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
141 142 """Create an SSH connection to a server.
142 143
143 144 Returns a tuple of (process, stdin, stdout, stderr) for the
144 145 spawned process.
145 146 """
146 147 cmd = '%s %s %s' % (
147 148 sshcmd,
148 149 args,
149 150 util.shellquote('%s -R %s serve --stdio' % (
150 151 _serverquote(remotecmd), _serverquote(path))))
151 152
152 153 ui.debug('running %s\n' % cmd)
153 154 cmd = util.quotecommand(cmd)
154 155
155 156 # no buffer allow the use of 'select'
156 157 # feel free to remove buffering and select usage when we ultimately
157 158 # move to threading.
158 159 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
159 160
160 161 return proc, stdin, stdout, stderr
161 162
162 163 def _performhandshake(ui, stdin, stdout, stderr):
163 164 def badresponse():
164 165 # Flush any output on stderr.
165 166 _forwardoutput(ui, stderr)
166 167
167 168 msg = _('no suitable response from remote hg')
168 169 hint = ui.config('ui', 'ssherrorhint')
169 170 raise error.RepoError(msg, hint=hint)
170 171
171 172 # The handshake consists of sending wire protocol commands in reverse
172 173 # order of protocol implementation and then sniffing for a response
173 174 # to one of them.
174 175 #
175 176 # Those commands (from oldest to newest) are:
176 177 #
177 178 # ``between``
178 179 # Asks for the set of revisions between a pair of revisions. Command
179 180 # present in all Mercurial server implementations.
180 181 #
181 182 # ``hello``
182 183 # Instructs the server to advertise its capabilities. Introduced in
183 184 # Mercurial 0.9.1.
184 185 #
185 186 # ``upgrade``
186 187 # Requests upgrade from default transport protocol version 1 to
187 188 # a newer version. Introduced in Mercurial 4.6 as an experimental
188 189 # feature.
189 190 #
190 191 # The ``between`` command is issued with a request for the null
191 192 # range. If the remote is a Mercurial server, this request will
192 193 # generate a specific response: ``1\n\n``. This represents the
193 194 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
194 195 # in the output stream and know this is the response to ``between``
195 196 # and we're at the end of our handshake reply.
196 197 #
197 198 # The response to the ``hello`` command will be a line with the
198 199 # length of the value returned by that command followed by that
199 200 # value. If the server doesn't support ``hello`` (which should be
200 201 # rare), that line will be ``0\n``. Otherwise, the value will contain
201 202 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
202 203 # the capabilities of the server.
203 204 #
204 205 # The ``upgrade`` command isn't really a command in the traditional
205 206 # sense of version 1 of the transport because it isn't using the
206 207 # proper mechanism for formatting insteads: instead, it just encodes
207 208 # arguments on the line, delimited by spaces.
208 209 #
209 210 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
210 211 # If the server doesn't support protocol upgrades, it will reply to
211 212 # this line with ``0\n``. Otherwise, it emits an
212 213 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
213 214 # Content immediately following this line describes additional
214 215 # protocol and server state.
215 216 #
216 217 # In addition to the responses to our command requests, the server
217 218 # may emit "banner" output on stdout. SSH servers are allowed to
218 219 # print messages to stdout on login. Issuing commands on connection
219 220 # allows us to flush this banner output from the server by scanning
220 221 # for output to our well-known ``between`` command. Of course, if
221 222 # the banner contains ``1\n\n``, this will throw off our detection.
222 223
223 224 requestlog = ui.configbool('devel', 'debug.peer-request')
224 225
225 226 # Generate a random token to help identify responses to version 2
226 227 # upgrade request.
227 228 token = pycompat.sysbytes(str(uuid.uuid4()))
228 229 upgradecaps = [
229 230 ('proto', wireprotoserver.SSHV2),
230 231 ]
231 232 upgradecaps = util.urlreq.urlencode(upgradecaps)
232 233
233 234 try:
234 235 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
235 236 handshake = [
236 237 'hello\n',
237 238 'between\n',
238 239 'pairs %d\n' % len(pairsarg),
239 240 pairsarg,
240 241 ]
241 242
242 243 # Request upgrade to version 2 if configured.
243 244 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
244 245 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
245 246 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
246 247
247 248 if requestlog:
248 249 ui.debug('devel-peer-request: hello\n')
249 250 ui.debug('sending hello command\n')
250 251 if requestlog:
251 252 ui.debug('devel-peer-request: between\n')
252 253 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
253 254 ui.debug('sending between command\n')
254 255
255 256 stdin.write(''.join(handshake))
256 257 stdin.flush()
257 258 except IOError:
258 259 badresponse()
259 260
260 261 # Assume version 1 of wire protocol by default.
261 262 protoname = wireprototypes.SSHV1
262 263 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
263 264
264 265 lines = ['', 'dummy']
265 266 max_noise = 500
266 267 while lines[-1] and max_noise:
267 268 try:
268 269 l = stdout.readline()
269 270 _forwardoutput(ui, stderr)
270 271
271 272 # Look for reply to protocol upgrade request. It has a token
272 273 # in it, so there should be no false positives.
273 274 m = reupgraded.match(l)
274 275 if m:
275 276 protoname = m.group(1)
276 277 ui.debug('protocol upgraded to %s\n' % protoname)
277 278 # If an upgrade was handled, the ``hello`` and ``between``
278 279 # requests are ignored. The next output belongs to the
279 280 # protocol, so stop scanning lines.
280 281 break
281 282
282 283 # Otherwise it could be a banner, ``0\n`` response if server
283 284 # doesn't support upgrade.
284 285
285 286 if lines[-1] == '1\n' and l == '\n':
286 287 break
287 288 if l:
288 289 ui.debug('remote: ', l)
289 290 lines.append(l)
290 291 max_noise -= 1
291 292 except IOError:
292 293 badresponse()
293 294 else:
294 295 badresponse()
295 296
296 297 caps = set()
297 298
298 299 # For version 1, we should see a ``capabilities`` line in response to the
299 300 # ``hello`` command.
300 301 if protoname == wireprototypes.SSHV1:
301 302 for l in reversed(lines):
302 303 # Look for response to ``hello`` command. Scan from the back so
303 304 # we don't misinterpret banner output as the command reply.
304 305 if l.startswith('capabilities:'):
305 306 caps.update(l[:-1].split(':')[1].split())
306 307 break
307 308 elif protoname == wireprotoserver.SSHV2:
308 309 # We see a line with number of bytes to follow and then a value
309 310 # looking like ``capabilities: *``.
310 311 line = stdout.readline()
311 312 try:
312 313 valuelen = int(line)
313 314 except ValueError:
314 315 badresponse()
315 316
316 317 capsline = stdout.read(valuelen)
317 318 if not capsline.startswith('capabilities: '):
318 319 badresponse()
319 320
320 321 ui.debug('remote: %s\n' % capsline)
321 322
322 323 caps.update(capsline.split(':')[1].split())
323 324 # Trailing newline.
324 325 stdout.read(1)
325 326
326 327 # Error if we couldn't find capabilities, this means:
327 328 #
328 329 # 1. Remote isn't a Mercurial server
329 330 # 2. Remote is a <0.9.1 Mercurial server
330 331 # 3. Remote is a future Mercurial server that dropped ``hello``
331 332 # and other attempted handshake mechanisms.
332 333 if not caps:
333 334 badresponse()
334 335
335 336 # Flush any output on stderr before proceeding.
336 337 _forwardoutput(ui, stderr)
337 338
338 339 return protoname, caps
339 340
340 341 class sshv1peer(wireproto.wirepeer):
341 342 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
342 343 autoreadstderr=True):
343 344 """Create a peer from an existing SSH connection.
344 345
345 346 ``proc`` is a handle on the underlying SSH process.
346 347 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
347 348 pipes for that process.
348 349 ``caps`` is a set of capabilities supported by the remote.
349 350 ``autoreadstderr`` denotes whether to automatically read from
350 351 stderr and to forward its output.
351 352 """
352 353 self._url = url
353 354 self._ui = ui
354 355 # self._subprocess is unused. Keeping a handle on the process
355 356 # holds a reference and prevents it from being garbage collected.
356 357 self._subprocess = proc
357 358
358 359 # And we hook up our "doublepipe" wrapper to allow querying
359 360 # stderr any time we perform I/O.
360 361 if autoreadstderr:
361 362 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
362 363 stdin = doublepipe(ui, stdin, stderr)
363 364
364 365 self._pipeo = stdin
365 366 self._pipei = stdout
366 367 self._pipee = stderr
367 368 self._caps = caps
368 369 self._autoreadstderr = autoreadstderr
369 370
370 371 # Commands that have a "framed" response where the first line of the
371 372 # response contains the length of that response.
372 373 _FRAMED_COMMANDS = {
373 374 'batch',
374 375 }
375 376
376 377 # Begin of _basepeer interface.
377 378
378 379 @util.propertycache
379 380 def ui(self):
380 381 return self._ui
381 382
382 383 def url(self):
383 384 return self._url
384 385
385 386 def local(self):
386 387 return None
387 388
388 389 def peer(self):
389 390 return self
390 391
391 392 def canpush(self):
392 393 return True
393 394
394 395 def close(self):
395 396 pass
396 397
397 398 # End of _basepeer interface.
398 399
399 400 # Begin of _basewirecommands interface.
400 401
401 402 def capabilities(self):
402 403 return self._caps
403 404
404 405 # End of _basewirecommands interface.
405 406
406 407 def _readerr(self):
407 408 _forwardoutput(self.ui, self._pipee)
408 409
409 410 def _abort(self, exception):
410 411 self._cleanup()
411 412 raise exception
412 413
413 414 def _cleanup(self):
414 415 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
415 416
416 417 __del__ = _cleanup
417 418
418 419 def _sendrequest(self, cmd, args, framed=False):
419 420 if (self.ui.debugflag
420 421 and self.ui.configbool('devel', 'debug.peer-request')):
421 422 dbg = self.ui.debug
422 423 line = 'devel-peer-request: %s\n'
423 424 dbg(line % cmd)
424 425 for key, value in sorted(args.items()):
425 426 if not isinstance(value, dict):
426 427 dbg(line % ' %s: %d bytes' % (key, len(value)))
427 428 else:
428 429 for dk, dv in sorted(value.items()):
429 430 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
430 431 self.ui.debug("sending %s command\n" % cmd)
431 432 self._pipeo.write("%s\n" % cmd)
432 433 _func, names = wireproto.commands[cmd]
433 434 keys = names.split()
434 435 wireargs = {}
435 436 for k in keys:
436 437 if k == '*':
437 438 wireargs['*'] = args
438 439 break
439 440 else:
440 441 wireargs[k] = args[k]
441 442 del args[k]
442 443 for k, v in sorted(wireargs.iteritems()):
443 444 self._pipeo.write("%s %d\n" % (k, len(v)))
444 445 if isinstance(v, dict):
445 446 for dk, dv in v.iteritems():
446 447 self._pipeo.write("%s %d\n" % (dk, len(dv)))
447 448 self._pipeo.write(dv)
448 449 else:
449 450 self._pipeo.write(v)
450 451 self._pipeo.flush()
451 452
452 453 # We know exactly how many bytes are in the response. So return a proxy
453 454 # around the raw output stream that allows reading exactly this many
454 455 # bytes. Callers then can read() without fear of overrunning the
455 456 # response.
456 457 if framed:
457 458 amount = self._getamount()
458 459 return util.cappedreader(self._pipei, amount)
459 460
460 461 return self._pipei
461 462
462 463 def _callstream(self, cmd, **args):
463 464 args = pycompat.byteskwargs(args)
464 465 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
465 466
466 467 def _callcompressable(self, cmd, **args):
467 468 args = pycompat.byteskwargs(args)
468 469 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
469 470
470 471 def _call(self, cmd, **args):
471 472 args = pycompat.byteskwargs(args)
472 473 return self._sendrequest(cmd, args, framed=True).read()
473 474
474 475 def _callpush(self, cmd, fp, **args):
475 476 # The server responds with an empty frame if the client should
476 477 # continue submitting the payload.
477 478 r = self._call(cmd, **args)
478 479 if r:
479 480 return '', r
480 481
481 482 # The payload consists of frames with content followed by an empty
482 483 # frame.
483 484 for d in iter(lambda: fp.read(4096), ''):
484 485 self._writeframed(d)
485 486 self._writeframed("", flush=True)
486 487
487 488 # In case of success, there is an empty frame and a frame containing
488 489 # the integer result (as a string).
489 490 # In case of error, there is a non-empty frame containing the error.
490 491 r = self._readframed()
491 492 if r:
492 493 return '', r
493 494 return self._readframed(), ''
494 495
495 496 def _calltwowaystream(self, cmd, fp, **args):
496 497 # The server responds with an empty frame if the client should
497 498 # continue submitting the payload.
498 499 r = self._call(cmd, **args)
499 500 if r:
500 501 # XXX needs to be made better
501 502 raise error.Abort(_('unexpected remote reply: %s') % r)
502 503
503 504 # The payload consists of frames with content followed by an empty
504 505 # frame.
505 506 for d in iter(lambda: fp.read(4096), ''):
506 507 self._writeframed(d)
507 508 self._writeframed("", flush=True)
508 509
509 510 return self._pipei
510 511
511 512 def _getamount(self):
512 513 l = self._pipei.readline()
513 514 if l == '\n':
514 515 if self._autoreadstderr:
515 516 self._readerr()
516 517 msg = _('check previous remote output')
517 518 self._abort(error.OutOfBandError(hint=msg))
518 519 if self._autoreadstderr:
519 520 self._readerr()
520 521 try:
521 522 return int(l)
522 523 except ValueError:
523 524 self._abort(error.ResponseError(_("unexpected response:"), l))
524 525
525 526 def _readframed(self):
526 527 size = self._getamount()
527 528 if not size:
528 529 return b''
529 530
530 531 return self._pipei.read(size)
531 532
532 533 def _writeframed(self, data, flush=False):
533 534 self._pipeo.write("%d\n" % len(data))
534 535 if data:
535 536 self._pipeo.write(data)
536 537 if flush:
537 538 self._pipeo.flush()
538 539 if self._autoreadstderr:
539 540 self._readerr()
540 541
541 542 class sshv2peer(sshv1peer):
542 543 """A peer that speakers version 2 of the transport protocol."""
543 544 # Currently version 2 is identical to version 1 post handshake.
544 545 # And handshake is performed before the peer is instantiated. So
545 546 # we need no custom code.
546 547
547 548 def makepeer(ui, path, proc, stdin, stdout, stderr, autoreadstderr=True):
548 549 """Make a peer instance from existing pipes.
549 550
550 551 ``path`` and ``proc`` are stored on the eventual peer instance and may
551 552 not be used for anything meaningful.
552 553
553 554 ``stdin``, ``stdout``, and ``stderr`` are the pipes connected to the
554 555 SSH server's stdio handles.
555 556
556 557 This function is factored out to allow creating peers that don't
557 558 actually spawn a new process. It is useful for starting SSH protocol
558 559 servers and clients via non-standard means, which can be useful for
559 560 testing.
560 561 """
561 562 try:
562 563 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
563 564 except Exception:
564 565 _cleanuppipes(ui, stdout, stdin, stderr)
565 566 raise
566 567
567 568 if protoname == wireprototypes.SSHV1:
568 569 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps,
569 570 autoreadstderr=autoreadstderr)
570 571 elif protoname == wireprototypes.SSHV2:
571 572 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps,
572 573 autoreadstderr=autoreadstderr)
573 574 else:
574 575 _cleanuppipes(ui, stdout, stdin, stderr)
575 576 raise error.RepoError(_('unknown version of SSH protocol: %s') %
576 577 protoname)
577 578
578 579 def instance(ui, path, create):
579 580 """Create an SSH peer.
580 581
581 582 The returned object conforms to the ``wireproto.wirepeer`` interface.
582 583 """
583 584 u = util.url(path, parsequery=False, parsefragment=False)
584 585 if u.scheme != 'ssh' or not u.host or u.path is None:
585 586 raise error.RepoError(_("couldn't parse location %s") % path)
586 587
587 588 util.checksafessh(path)
588 589
589 590 if u.passwd is not None:
590 591 raise error.RepoError(_('password in URL not supported'))
591 592
592 593 sshcmd = ui.config('ui', 'ssh')
593 594 remotecmd = ui.config('ui', 'remotecmd')
594 595 sshaddenv = dict(ui.configitems('sshenv'))
595 596 sshenv = util.shellenviron(sshaddenv)
596 597 remotepath = u.path or '.'
597 598
598 599 args = util.sshargs(sshcmd, u.host, u.user, u.port)
599 600
600 601 if create:
601 602 cmd = '%s %s %s' % (sshcmd, args,
602 603 util.shellquote('%s init %s' %
603 604 (_serverquote(remotecmd), _serverquote(remotepath))))
604 605 ui.debug('running %s\n' % cmd)
605 606 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
606 607 if res != 0:
607 608 raise error.RepoError(_('could not create remote repo'))
608 609
609 610 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
610 611 remotepath, sshenv)
611 612
612 613 return makepeer(ui, path, proc, stdin, stdout, stderr)
General Comments 0
You need to be logged in to leave comments. Login now