##// END OF EJS Templates
sshpeer: return framed file object when needed...
Gregory Szorc -
r36385:043e77f3 default
parent child Browse files
Show More
@@ -1,560 +1,554
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11 import uuid
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 error,
16 16 pycompat,
17 17 util,
18 18 wireproto,
19 19 wireprotoserver,
20 20 )
21 21
22 22 def _serverquote(s):
23 23 """quote a string for the remote shell ... which we assume is sh"""
24 24 if not s:
25 25 return s
26 26 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
27 27 return s
28 28 return "'%s'" % s.replace("'", "'\\''")
29 29
30 30 def _forwardoutput(ui, pipe):
31 31 """display all data currently available on pipe as remote output.
32 32
33 33 This is non blocking."""
34 34 s = util.readpipe(pipe)
35 35 if s:
36 36 for l in s.splitlines():
37 37 ui.status(_("remote: "), l, '\n')
38 38
39 39 class doublepipe(object):
40 40 """Operate a side-channel pipe in addition of a main one
41 41
42 42 The side-channel pipe contains server output to be forwarded to the user
43 43 input. The double pipe will behave as the "main" pipe, but will ensure the
44 44 content of the "side" pipe is properly processed while we wait for blocking
45 45 call on the "main" pipe.
46 46
47 47 If large amounts of data are read from "main", the forward will cease after
48 48 the first bytes start to appear. This simplifies the implementation
49 49 without affecting actual output of sshpeer too much as we rarely issue
50 50 large read for data not yet emitted by the server.
51 51
52 52 The main pipe is expected to be a 'bufferedinputpipe' from the util module
53 53 that handle all the os specific bits. This class lives in this module
54 54 because it focus on behavior specific to the ssh protocol."""
55 55
56 56 def __init__(self, ui, main, side):
57 57 self._ui = ui
58 58 self._main = main
59 59 self._side = side
60 60
61 61 def _wait(self):
62 62 """wait until some data are available on main or side
63 63
64 64 return a pair of boolean (ismainready, issideready)
65 65
66 66 (This will only wait for data if the setup is supported by `util.poll`)
67 67 """
68 68 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
69 69 return (True, True) # main has data, assume side is worth poking at.
70 70 fds = [self._main.fileno(), self._side.fileno()]
71 71 try:
72 72 act = util.poll(fds)
73 73 except NotImplementedError:
74 74 # non supported yet case, assume all have data.
75 75 act = fds
76 76 return (self._main.fileno() in act, self._side.fileno() in act)
77 77
78 78 def write(self, data):
79 79 return self._call('write', data)
80 80
81 81 def read(self, size):
82 82 r = self._call('read', size)
83 83 if size != 0 and not r:
84 84 # We've observed a condition that indicates the
85 85 # stdout closed unexpectedly. Check stderr one
86 86 # more time and snag anything that's there before
87 87 # letting anyone know the main part of the pipe
88 88 # closed prematurely.
89 89 _forwardoutput(self._ui, self._side)
90 90 return r
91 91
92 92 def readline(self):
93 93 return self._call('readline')
94 94
95 95 def _call(self, methname, data=None):
96 96 """call <methname> on "main", forward output of "side" while blocking
97 97 """
98 98 # data can be '' or 0
99 99 if (data is not None and not data) or self._main.closed:
100 100 _forwardoutput(self._ui, self._side)
101 101 return ''
102 102 while True:
103 103 mainready, sideready = self._wait()
104 104 if sideready:
105 105 _forwardoutput(self._ui, self._side)
106 106 if mainready:
107 107 meth = getattr(self._main, methname)
108 108 if data is None:
109 109 return meth()
110 110 else:
111 111 return meth(data)
112 112
113 113 def close(self):
114 114 return self._main.close()
115 115
116 116 def flush(self):
117 117 return self._main.flush()
118 118
119 119 def _cleanuppipes(ui, pipei, pipeo, pipee):
120 120 """Clean up pipes used by an SSH connection."""
121 121 if pipeo:
122 122 pipeo.close()
123 123 if pipei:
124 124 pipei.close()
125 125
126 126 if pipee:
127 127 # Try to read from the err descriptor until EOF.
128 128 try:
129 129 for l in pipee:
130 130 ui.status(_('remote: '), l)
131 131 except (IOError, ValueError):
132 132 pass
133 133
134 134 pipee.close()
135 135
136 136 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
137 137 """Create an SSH connection to a server.
138 138
139 139 Returns a tuple of (process, stdin, stdout, stderr) for the
140 140 spawned process.
141 141 """
142 142 cmd = '%s %s %s' % (
143 143 sshcmd,
144 144 args,
145 145 util.shellquote('%s -R %s serve --stdio' % (
146 146 _serverquote(remotecmd), _serverquote(path))))
147 147
148 148 ui.debug('running %s\n' % cmd)
149 149 cmd = util.quotecommand(cmd)
150 150
151 151 # no buffer allow the use of 'select'
152 152 # feel free to remove buffering and select usage when we ultimately
153 153 # move to threading.
154 154 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
155 155
156 156 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
157 157 stdin = doublepipe(ui, stdin, stderr)
158 158
159 159 return proc, stdin, stdout, stderr
160 160
161 161 def _performhandshake(ui, stdin, stdout, stderr):
162 162 def badresponse():
163 163 msg = _('no suitable response from remote hg')
164 164 hint = ui.config('ui', 'ssherrorhint')
165 165 raise error.RepoError(msg, hint=hint)
166 166
167 167 # The handshake consists of sending wire protocol commands in reverse
168 168 # order of protocol implementation and then sniffing for a response
169 169 # to one of them.
170 170 #
171 171 # Those commands (from oldest to newest) are:
172 172 #
173 173 # ``between``
174 174 # Asks for the set of revisions between a pair of revisions. Command
175 175 # present in all Mercurial server implementations.
176 176 #
177 177 # ``hello``
178 178 # Instructs the server to advertise its capabilities. Introduced in
179 179 # Mercurial 0.9.1.
180 180 #
181 181 # ``upgrade``
182 182 # Requests upgrade from default transport protocol version 1 to
183 183 # a newer version. Introduced in Mercurial 4.6 as an experimental
184 184 # feature.
185 185 #
186 186 # The ``between`` command is issued with a request for the null
187 187 # range. If the remote is a Mercurial server, this request will
188 188 # generate a specific response: ``1\n\n``. This represents the
189 189 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
190 190 # in the output stream and know this is the response to ``between``
191 191 # and we're at the end of our handshake reply.
192 192 #
193 193 # The response to the ``hello`` command will be a line with the
194 194 # length of the value returned by that command followed by that
195 195 # value. If the server doesn't support ``hello`` (which should be
196 196 # rare), that line will be ``0\n``. Otherwise, the value will contain
197 197 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
198 198 # the capabilities of the server.
199 199 #
200 200 # The ``upgrade`` command isn't really a command in the traditional
201 201 # sense of version 1 of the transport because it isn't using the
202 202 # proper mechanism for formatting insteads: instead, it just encodes
203 203 # arguments on the line, delimited by spaces.
204 204 #
205 205 # The ``upgrade`` line looks like ``upgrade <token> <capabilities>``.
206 206 # If the server doesn't support protocol upgrades, it will reply to
207 207 # this line with ``0\n``. Otherwise, it emits an
208 208 # ``upgraded <token> <protocol>`` line to both stdout and stderr.
209 209 # Content immediately following this line describes additional
210 210 # protocol and server state.
211 211 #
212 212 # In addition to the responses to our command requests, the server
213 213 # may emit "banner" output on stdout. SSH servers are allowed to
214 214 # print messages to stdout on login. Issuing commands on connection
215 215 # allows us to flush this banner output from the server by scanning
216 216 # for output to our well-known ``between`` command. Of course, if
217 217 # the banner contains ``1\n\n``, this will throw off our detection.
218 218
219 219 requestlog = ui.configbool('devel', 'debug.peer-request')
220 220
221 221 # Generate a random token to help identify responses to version 2
222 222 # upgrade request.
223 223 token = pycompat.sysbytes(str(uuid.uuid4()))
224 224 upgradecaps = [
225 225 ('proto', wireprotoserver.SSHV2),
226 226 ]
227 227 upgradecaps = util.urlreq.urlencode(upgradecaps)
228 228
229 229 try:
230 230 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
231 231 handshake = [
232 232 'hello\n',
233 233 'between\n',
234 234 'pairs %d\n' % len(pairsarg),
235 235 pairsarg,
236 236 ]
237 237
238 238 # Request upgrade to version 2 if configured.
239 239 if ui.configbool('experimental', 'sshpeer.advertise-v2'):
240 240 ui.debug('sending upgrade request: %s %s\n' % (token, upgradecaps))
241 241 handshake.insert(0, 'upgrade %s %s\n' % (token, upgradecaps))
242 242
243 243 if requestlog:
244 244 ui.debug('devel-peer-request: hello\n')
245 245 ui.debug('sending hello command\n')
246 246 if requestlog:
247 247 ui.debug('devel-peer-request: between\n')
248 248 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
249 249 ui.debug('sending between command\n')
250 250
251 251 stdin.write(''.join(handshake))
252 252 stdin.flush()
253 253 except IOError:
254 254 badresponse()
255 255
256 256 # Assume version 1 of wire protocol by default.
257 257 protoname = wireprotoserver.SSHV1
258 258 reupgraded = re.compile(b'^upgraded %s (.*)$' % re.escape(token))
259 259
260 260 lines = ['', 'dummy']
261 261 max_noise = 500
262 262 while lines[-1] and max_noise:
263 263 try:
264 264 l = stdout.readline()
265 265 _forwardoutput(ui, stderr)
266 266
267 267 # Look for reply to protocol upgrade request. It has a token
268 268 # in it, so there should be no false positives.
269 269 m = reupgraded.match(l)
270 270 if m:
271 271 protoname = m.group(1)
272 272 ui.debug('protocol upgraded to %s\n' % protoname)
273 273 # If an upgrade was handled, the ``hello`` and ``between``
274 274 # requests are ignored. The next output belongs to the
275 275 # protocol, so stop scanning lines.
276 276 break
277 277
278 278 # Otherwise it could be a banner, ``0\n`` response if server
279 279 # doesn't support upgrade.
280 280
281 281 if lines[-1] == '1\n' and l == '\n':
282 282 break
283 283 if l:
284 284 ui.debug('remote: ', l)
285 285 lines.append(l)
286 286 max_noise -= 1
287 287 except IOError:
288 288 badresponse()
289 289 else:
290 290 badresponse()
291 291
292 292 caps = set()
293 293
294 294 # For version 1, we should see a ``capabilities`` line in response to the
295 295 # ``hello`` command.
296 296 if protoname == wireprotoserver.SSHV1:
297 297 for l in reversed(lines):
298 298 # Look for response to ``hello`` command. Scan from the back so
299 299 # we don't misinterpret banner output as the command reply.
300 300 if l.startswith('capabilities:'):
301 301 caps.update(l[:-1].split(':')[1].split())
302 302 break
303 303 elif protoname == wireprotoserver.SSHV2:
304 304 # We see a line with number of bytes to follow and then a value
305 305 # looking like ``capabilities: *``.
306 306 line = stdout.readline()
307 307 try:
308 308 valuelen = int(line)
309 309 except ValueError:
310 310 badresponse()
311 311
312 312 capsline = stdout.read(valuelen)
313 313 if not capsline.startswith('capabilities: '):
314 314 badresponse()
315 315
316 316 ui.debug('remote: %s\n' % capsline)
317 317
318 318 caps.update(capsline.split(':')[1].split())
319 319 # Trailing newline.
320 320 stdout.read(1)
321 321
322 322 # Error if we couldn't find capabilities, this means:
323 323 #
324 324 # 1. Remote isn't a Mercurial server
325 325 # 2. Remote is a <0.9.1 Mercurial server
326 326 # 3. Remote is a future Mercurial server that dropped ``hello``
327 327 # and other attempted handshake mechanisms.
328 328 if not caps:
329 329 badresponse()
330 330
331 331 return protoname, caps
332 332
333 333 class sshv1peer(wireproto.wirepeer):
334 334 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
335 335 """Create a peer from an existing SSH connection.
336 336
337 337 ``proc`` is a handle on the underlying SSH process.
338 338 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
339 339 pipes for that process.
340 340 ``caps`` is a set of capabilities supported by the remote.
341 341 """
342 342 self._url = url
343 343 self._ui = ui
344 344 # self._subprocess is unused. Keeping a handle on the process
345 345 # holds a reference and prevents it from being garbage collected.
346 346 self._subprocess = proc
347 347 self._pipeo = stdin
348 348 self._pipei = stdout
349 349 self._pipee = stderr
350 350 self._caps = caps
351 351
352 # Commands that have a "framed" response where the first line of the
353 # response contains the length of that response.
354 _FRAMED_COMMANDS = {
355 'batch',
356 }
357
352 358 # Begin of _basepeer interface.
353 359
354 360 @util.propertycache
355 361 def ui(self):
356 362 return self._ui
357 363
358 364 def url(self):
359 365 return self._url
360 366
361 367 def local(self):
362 368 return None
363 369
364 370 def peer(self):
365 371 return self
366 372
367 373 def canpush(self):
368 374 return True
369 375
370 376 def close(self):
371 377 pass
372 378
373 379 # End of _basepeer interface.
374 380
375 381 # Begin of _basewirecommands interface.
376 382
377 383 def capabilities(self):
378 384 return self._caps
379 385
380 386 # End of _basewirecommands interface.
381 387
382 388 def _readerr(self):
383 389 _forwardoutput(self.ui, self._pipee)
384 390
385 391 def _abort(self, exception):
386 392 self._cleanup()
387 393 raise exception
388 394
389 395 def _cleanup(self):
390 396 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
391 397
392 398 __del__ = _cleanup
393 399
394 def _submitbatch(self, req):
395 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
396 available = self._getamount()
397 # TODO this response parsing is probably suboptimal for large
398 # batches with large responses.
399 toread = min(available, 1024)
400 work = rsp.read(toread)
401 available -= toread
402 chunk = work
403 while chunk:
404 while ';' in work:
405 one, work = work.split(';', 1)
406 yield wireproto.unescapearg(one)
407 toread = min(available, 1024)
408 chunk = rsp.read(toread)
409 available -= toread
410 work += chunk
411 yield wireproto.unescapearg(work)
412
413 def _sendrequest(self, cmd, args):
400 def _sendrequest(self, cmd, args, framed=False):
414 401 if (self.ui.debugflag
415 402 and self.ui.configbool('devel', 'debug.peer-request')):
416 403 dbg = self.ui.debug
417 404 line = 'devel-peer-request: %s\n'
418 405 dbg(line % cmd)
419 406 for key, value in sorted(args.items()):
420 407 if not isinstance(value, dict):
421 408 dbg(line % ' %s: %d bytes' % (key, len(value)))
422 409 else:
423 410 for dk, dv in sorted(value.items()):
424 411 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
425 412 self.ui.debug("sending %s command\n" % cmd)
426 413 self._pipeo.write("%s\n" % cmd)
427 414 _func, names = wireproto.commands[cmd]
428 415 keys = names.split()
429 416 wireargs = {}
430 417 for k in keys:
431 418 if k == '*':
432 419 wireargs['*'] = args
433 420 break
434 421 else:
435 422 wireargs[k] = args[k]
436 423 del args[k]
437 424 for k, v in sorted(wireargs.iteritems()):
438 425 self._pipeo.write("%s %d\n" % (k, len(v)))
439 426 if isinstance(v, dict):
440 427 for dk, dv in v.iteritems():
441 428 self._pipeo.write("%s %d\n" % (dk, len(dv)))
442 429 self._pipeo.write(dv)
443 430 else:
444 431 self._pipeo.write(v)
445 432 self._pipeo.flush()
446 433
434 # We know exactly how many bytes are in the response. So return a proxy
435 # around the raw output stream that allows reading exactly this many
436 # bytes. Callers then can read() without fear of overrunning the
437 # response.
438 if framed:
439 amount = self._getamount()
440 return util.cappedreader(self._pipei, amount)
441
447 442 return self._pipei
448 443
449 444 def _callstream(self, cmd, **args):
450 445 args = pycompat.byteskwargs(args)
451 return self._sendrequest(cmd, args)
446 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
452 447
453 448 def _callcompressable(self, cmd, **args):
454 449 args = pycompat.byteskwargs(args)
455 return self._sendrequest(cmd, args)
450 return self._sendrequest(cmd, args, framed=cmd in self._FRAMED_COMMANDS)
456 451
457 452 def _call(self, cmd, **args):
458 453 args = pycompat.byteskwargs(args)
459 self._sendrequest(cmd, args)
460 return self._readframed()
454 return self._sendrequest(cmd, args, framed=True).read()
461 455
462 456 def _callpush(self, cmd, fp, **args):
463 457 r = self._call(cmd, **args)
464 458 if r:
465 459 return '', r
466 460 for d in iter(lambda: fp.read(4096), ''):
467 461 self._writeframed(d)
468 462 self._writeframed("", flush=True)
469 463 r = self._readframed()
470 464 if r:
471 465 return '', r
472 466 return self._readframed(), ''
473 467
474 468 def _calltwowaystream(self, cmd, fp, **args):
475 469 r = self._call(cmd, **args)
476 470 if r:
477 471 # XXX needs to be made better
478 472 raise error.Abort(_('unexpected remote reply: %s') % r)
479 473 for d in iter(lambda: fp.read(4096), ''):
480 474 self._writeframed(d)
481 475 self._writeframed("", flush=True)
482 476 return self._pipei
483 477
484 478 def _getamount(self):
485 479 l = self._pipei.readline()
486 480 if l == '\n':
487 481 self._readerr()
488 482 msg = _('check previous remote output')
489 483 self._abort(error.OutOfBandError(hint=msg))
490 484 self._readerr()
491 485 try:
492 486 return int(l)
493 487 except ValueError:
494 488 self._abort(error.ResponseError(_("unexpected response:"), l))
495 489
496 490 def _readframed(self):
497 491 return self._pipei.read(self._getamount())
498 492
499 493 def _writeframed(self, data, flush=False):
500 494 self._pipeo.write("%d\n" % len(data))
501 495 if data:
502 496 self._pipeo.write(data)
503 497 if flush:
504 498 self._pipeo.flush()
505 499 self._readerr()
506 500
507 501 class sshv2peer(sshv1peer):
508 502 """A peer that speakers version 2 of the transport protocol."""
509 503 # Currently version 2 is identical to version 1 post handshake.
510 504 # And handshake is performed before the peer is instantiated. So
511 505 # we need no custom code.
512 506
513 507 def instance(ui, path, create):
514 508 """Create an SSH peer.
515 509
516 510 The returned object conforms to the ``wireproto.wirepeer`` interface.
517 511 """
518 512 u = util.url(path, parsequery=False, parsefragment=False)
519 513 if u.scheme != 'ssh' or not u.host or u.path is None:
520 514 raise error.RepoError(_("couldn't parse location %s") % path)
521 515
522 516 util.checksafessh(path)
523 517
524 518 if u.passwd is not None:
525 519 raise error.RepoError(_('password in URL not supported'))
526 520
527 521 sshcmd = ui.config('ui', 'ssh')
528 522 remotecmd = ui.config('ui', 'remotecmd')
529 523 sshaddenv = dict(ui.configitems('sshenv'))
530 524 sshenv = util.shellenviron(sshaddenv)
531 525 remotepath = u.path or '.'
532 526
533 527 args = util.sshargs(sshcmd, u.host, u.user, u.port)
534 528
535 529 if create:
536 530 cmd = '%s %s %s' % (sshcmd, args,
537 531 util.shellquote('%s init %s' %
538 532 (_serverquote(remotecmd), _serverquote(remotepath))))
539 533 ui.debug('running %s\n' % cmd)
540 534 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
541 535 if res != 0:
542 536 raise error.RepoError(_('could not create remote repo'))
543 537
544 538 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
545 539 remotepath, sshenv)
546 540
547 541 try:
548 542 protoname, caps = _performhandshake(ui, stdin, stdout, stderr)
549 543 except Exception:
550 544 _cleanuppipes(ui, stdout, stdin, stderr)
551 545 raise
552 546
553 547 if protoname == wireprotoserver.SSHV1:
554 548 return sshv1peer(ui, path, proc, stdin, stdout, stderr, caps)
555 549 elif protoname == wireprotoserver.SSHV2:
556 550 return sshv2peer(ui, path, proc, stdin, stdout, stderr, caps)
557 551 else:
558 552 _cleanuppipes(ui, stdout, stdin, stderr)
559 553 raise error.RepoError(_('unknown version of SSH protocol: %s') %
560 554 protoname)
General Comments 0
You need to be logged in to leave comments. Login now