##// END OF EJS Templates
sshpeer: document the handshake mechanism...
Gregory Szorc -
r35957:a622a927 default
parent child Browse files
Show More
@@ -1,423 +1,456
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import re
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 error,
15 15 pycompat,
16 16 util,
17 17 wireproto,
18 18 )
19 19
20 20 def _serverquote(s):
21 21 """quote a string for the remote shell ... which we assume is sh"""
22 22 if not s:
23 23 return s
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 def _forwardoutput(ui, pipe):
29 29 """display all data currently available on pipe as remote output.
30 30
31 31 This is non blocking."""
32 32 s = util.readpipe(pipe)
33 33 if s:
34 34 for l in s.splitlines():
35 35 ui.status(_("remote: "), l, '\n')
36 36
37 37 class doublepipe(object):
38 38 """Operate a side-channel pipe in addition of a main one
39 39
40 40 The side-channel pipe contains server output to be forwarded to the user
41 41 input. The double pipe will behave as the "main" pipe, but will ensure the
42 42 content of the "side" pipe is properly processed while we wait for blocking
43 43 call on the "main" pipe.
44 44
45 45 If large amounts of data are read from "main", the forward will cease after
46 46 the first bytes start to appear. This simplifies the implementation
47 47 without affecting actual output of sshpeer too much as we rarely issue
48 48 large read for data not yet emitted by the server.
49 49
50 50 The main pipe is expected to be a 'bufferedinputpipe' from the util module
51 51 that handle all the os specific bits. This class lives in this module
52 52 because it focus on behavior specific to the ssh protocol."""
53 53
54 54 def __init__(self, ui, main, side):
55 55 self._ui = ui
56 56 self._main = main
57 57 self._side = side
58 58
59 59 def _wait(self):
60 60 """wait until some data are available on main or side
61 61
62 62 return a pair of boolean (ismainready, issideready)
63 63
64 64 (This will only wait for data if the setup is supported by `util.poll`)
65 65 """
66 66 if getattr(self._main, 'hasbuffer', False): # getattr for classic pipe
67 67 return (True, True) # main has data, assume side is worth poking at.
68 68 fds = [self._main.fileno(), self._side.fileno()]
69 69 try:
70 70 act = util.poll(fds)
71 71 except NotImplementedError:
72 72 # non supported yet case, assume all have data.
73 73 act = fds
74 74 return (self._main.fileno() in act, self._side.fileno() in act)
75 75
76 76 def write(self, data):
77 77 return self._call('write', data)
78 78
79 79 def read(self, size):
80 80 r = self._call('read', size)
81 81 if size != 0 and not r:
82 82 # We've observed a condition that indicates the
83 83 # stdout closed unexpectedly. Check stderr one
84 84 # more time and snag anything that's there before
85 85 # letting anyone know the main part of the pipe
86 86 # closed prematurely.
87 87 _forwardoutput(self._ui, self._side)
88 88 return r
89 89
90 90 def readline(self):
91 91 return self._call('readline')
92 92
93 93 def _call(self, methname, data=None):
94 94 """call <methname> on "main", forward output of "side" while blocking
95 95 """
96 96 # data can be '' or 0
97 97 if (data is not None and not data) or self._main.closed:
98 98 _forwardoutput(self._ui, self._side)
99 99 return ''
100 100 while True:
101 101 mainready, sideready = self._wait()
102 102 if sideready:
103 103 _forwardoutput(self._ui, self._side)
104 104 if mainready:
105 105 meth = getattr(self._main, methname)
106 106 if data is None:
107 107 return meth()
108 108 else:
109 109 return meth(data)
110 110
111 111 def close(self):
112 112 return self._main.close()
113 113
114 114 def flush(self):
115 115 return self._main.flush()
116 116
117 117 def _cleanuppipes(ui, pipei, pipeo, pipee):
118 118 """Clean up pipes used by an SSH connection."""
119 119 if pipeo:
120 120 pipeo.close()
121 121 if pipei:
122 122 pipei.close()
123 123
124 124 if pipee:
125 125 # Try to read from the err descriptor until EOF.
126 126 try:
127 127 for l in pipee:
128 128 ui.status(_('remote: '), l)
129 129 except (IOError, ValueError):
130 130 pass
131 131
132 132 pipee.close()
133 133
134 134 def _makeconnection(ui, sshcmd, args, remotecmd, path, sshenv=None):
135 135 """Create an SSH connection to a server.
136 136
137 137 Returns a tuple of (process, stdin, stdout, stderr) for the
138 138 spawned process.
139 139 """
140 140 cmd = '%s %s %s' % (
141 141 sshcmd,
142 142 args,
143 143 util.shellquote('%s -R %s serve --stdio' % (
144 144 _serverquote(remotecmd), _serverquote(path))))
145 145
146 146 ui.debug('running %s\n' % cmd)
147 147 cmd = util.quotecommand(cmd)
148 148
149 149 # no buffer allow the use of 'select'
150 150 # feel free to remove buffering and select usage when we ultimately
151 151 # move to threading.
152 152 stdin, stdout, stderr, proc = util.popen4(cmd, bufsize=0, env=sshenv)
153 153
154 154 stdout = doublepipe(ui, util.bufferedinputpipe(stdout), stderr)
155 155 stdin = doublepipe(ui, stdin, stderr)
156 156
157 157 return proc, stdin, stdout, stderr
158 158
159 159 def _performhandshake(ui, stdin, stdout, stderr):
160 160 def badresponse():
161 161 msg = _('no suitable response from remote hg')
162 162 hint = ui.config('ui', 'ssherrorhint')
163 163 raise error.RepoError(msg, hint=hint)
164 164
165 # The handshake consists of sending 2 wire protocol commands:
166 # ``hello`` and ``between``.
167 #
168 # The ``hello`` command (which was introduced in Mercurial 0.9.1)
169 # instructs the server to advertise its capabilities.
170 #
171 # The ``between`` command (which has existed in all Mercurial servers
172 # for as long as SSH support has existed), asks for the set of revisions
173 # between a pair of revisions.
174 #
175 # The ``between`` command is issued with a request for the null
176 # range. If the remote is a Mercurial server, this request will
177 # generate a specific response: ``1\n\n``. This represents the
178 # wire protocol encoded value for ``\n``. We look for ``1\n\n``
179 # in the output stream and know this is the response to ``between``
180 # and we're at the end of our handshake reply.
181 #
182 # The response to the ``hello`` command will be a line with the
183 # length of the value returned by that command followed by that
184 # value. If the server doesn't support ``hello`` (which should be
185 # rare), that line will be ``0\n``. Otherwise, the value will contain
186 # RFC 822 like lines. Of these, the ``capabilities:`` line contains
187 # the capabilities of the server.
188 #
189 # In addition to the responses to our command requests, the server
190 # may emit "banner" output on stdout. SSH servers are allowed to
191 # print messages to stdout on login. Issuing commands on connection
192 # allows us to flush this banner output from the server by scanning
193 # for output to our well-known ``between`` command. Of course, if
194 # the banner contains ``1\n\n``, this will throw off our detection.
195
165 196 requestlog = ui.configbool('devel', 'debug.peer-request')
166 197
167 198 try:
168 199 pairsarg = '%s-%s' % ('0' * 40, '0' * 40)
169 200 handshake = [
170 201 'hello\n',
171 202 'between\n',
172 203 'pairs %d\n' % len(pairsarg),
173 204 pairsarg,
174 205 ]
175 206
176 207 if requestlog:
177 208 ui.debug('devel-peer-request: hello\n')
178 209 ui.debug('sending hello command\n')
179 210 if requestlog:
180 211 ui.debug('devel-peer-request: between\n')
181 212 ui.debug('devel-peer-request: pairs: %d bytes\n' % len(pairsarg))
182 213 ui.debug('sending between command\n')
183 214
184 215 stdin.write(''.join(handshake))
185 216 stdin.flush()
186 217 except IOError:
187 218 badresponse()
188 219
189 220 lines = ['', 'dummy']
190 221 max_noise = 500
191 222 while lines[-1] and max_noise:
192 223 try:
193 224 l = stdout.readline()
194 225 _forwardoutput(ui, stderr)
195 226 if lines[-1] == '1\n' and l == '\n':
196 227 break
197 228 if l:
198 229 ui.debug('remote: ', l)
199 230 lines.append(l)
200 231 max_noise -= 1
201 232 except IOError:
202 233 badresponse()
203 234 else:
204 235 badresponse()
205 236
206 237 caps = set()
207 238 for l in reversed(lines):
239 # Look for response to ``hello`` command. Scan from the back so
240 # we don't misinterpret banner output as the command reply.
208 241 if l.startswith('capabilities:'):
209 242 caps.update(l[:-1].split(':')[1].split())
210 243 break
211 244
212 245 return caps
213 246
214 247 class sshpeer(wireproto.wirepeer):
215 248 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps):
216 249 """Create a peer from an existing SSH connection.
217 250
218 251 ``proc`` is a handle on the underlying SSH process.
219 252 ``stdin``, ``stdout``, and ``stderr`` are handles on the stdio
220 253 pipes for that process.
221 254 ``caps`` is a set of capabilities supported by the remote.
222 255 """
223 256 self._url = url
224 257 self._ui = ui
225 258 # self._subprocess is unused. Keeping a handle on the process
226 259 # holds a reference and prevents it from being garbage collected.
227 260 self._subprocess = proc
228 261 self._pipeo = stdin
229 262 self._pipei = stdout
230 263 self._pipee = stderr
231 264 self._caps = caps
232 265
233 266 # Begin of _basepeer interface.
234 267
235 268 @util.propertycache
236 269 def ui(self):
237 270 return self._ui
238 271
239 272 def url(self):
240 273 return self._url
241 274
242 275 def local(self):
243 276 return None
244 277
245 278 def peer(self):
246 279 return self
247 280
248 281 def canpush(self):
249 282 return True
250 283
251 284 def close(self):
252 285 pass
253 286
254 287 # End of _basepeer interface.
255 288
256 289 # Begin of _basewirecommands interface.
257 290
258 291 def capabilities(self):
259 292 return self._caps
260 293
261 294 # End of _basewirecommands interface.
262 295
263 296 def _readerr(self):
264 297 _forwardoutput(self.ui, self._pipee)
265 298
266 299 def _abort(self, exception):
267 300 self._cleanup()
268 301 raise exception
269 302
270 303 def _cleanup(self):
271 304 _cleanuppipes(self.ui, self._pipei, self._pipeo, self._pipee)
272 305
273 306 __del__ = _cleanup
274 307
275 308 def _submitbatch(self, req):
276 309 rsp = self._callstream("batch", cmds=wireproto.encodebatchcmds(req))
277 310 available = self._getamount()
278 311 # TODO this response parsing is probably suboptimal for large
279 312 # batches with large responses.
280 313 toread = min(available, 1024)
281 314 work = rsp.read(toread)
282 315 available -= toread
283 316 chunk = work
284 317 while chunk:
285 318 while ';' in work:
286 319 one, work = work.split(';', 1)
287 320 yield wireproto.unescapearg(one)
288 321 toread = min(available, 1024)
289 322 chunk = rsp.read(toread)
290 323 available -= toread
291 324 work += chunk
292 325 yield wireproto.unescapearg(work)
293 326
294 327 def _callstream(self, cmd, **args):
295 328 args = pycompat.byteskwargs(args)
296 329 if (self.ui.debugflag
297 330 and self.ui.configbool('devel', 'debug.peer-request')):
298 331 dbg = self.ui.debug
299 332 line = 'devel-peer-request: %s\n'
300 333 dbg(line % cmd)
301 334 for key, value in sorted(args.items()):
302 335 if not isinstance(value, dict):
303 336 dbg(line % ' %s: %d bytes' % (key, len(value)))
304 337 else:
305 338 for dk, dv in sorted(value.items()):
306 339 dbg(line % ' %s-%s: %d' % (key, dk, len(dv)))
307 340 self.ui.debug("sending %s command\n" % cmd)
308 341 self._pipeo.write("%s\n" % cmd)
309 342 _func, names = wireproto.commands[cmd]
310 343 keys = names.split()
311 344 wireargs = {}
312 345 for k in keys:
313 346 if k == '*':
314 347 wireargs['*'] = args
315 348 break
316 349 else:
317 350 wireargs[k] = args[k]
318 351 del args[k]
319 352 for k, v in sorted(wireargs.iteritems()):
320 353 self._pipeo.write("%s %d\n" % (k, len(v)))
321 354 if isinstance(v, dict):
322 355 for dk, dv in v.iteritems():
323 356 self._pipeo.write("%s %d\n" % (dk, len(dv)))
324 357 self._pipeo.write(dv)
325 358 else:
326 359 self._pipeo.write(v)
327 360 self._pipeo.flush()
328 361
329 362 return self._pipei
330 363
331 364 def _callcompressable(self, cmd, **args):
332 365 return self._callstream(cmd, **args)
333 366
334 367 def _call(self, cmd, **args):
335 368 self._callstream(cmd, **args)
336 369 return self._recv()
337 370
338 371 def _callpush(self, cmd, fp, **args):
339 372 r = self._call(cmd, **args)
340 373 if r:
341 374 return '', r
342 375 for d in iter(lambda: fp.read(4096), ''):
343 376 self._send(d)
344 377 self._send("", flush=True)
345 378 r = self._recv()
346 379 if r:
347 380 return '', r
348 381 return self._recv(), ''
349 382
350 383 def _calltwowaystream(self, cmd, fp, **args):
351 384 r = self._call(cmd, **args)
352 385 if r:
353 386 # XXX needs to be made better
354 387 raise error.Abort(_('unexpected remote reply: %s') % r)
355 388 for d in iter(lambda: fp.read(4096), ''):
356 389 self._send(d)
357 390 self._send("", flush=True)
358 391 return self._pipei
359 392
360 393 def _getamount(self):
361 394 l = self._pipei.readline()
362 395 if l == '\n':
363 396 self._readerr()
364 397 msg = _('check previous remote output')
365 398 self._abort(error.OutOfBandError(hint=msg))
366 399 self._readerr()
367 400 try:
368 401 return int(l)
369 402 except ValueError:
370 403 self._abort(error.ResponseError(_("unexpected response:"), l))
371 404
372 405 def _recv(self):
373 406 return self._pipei.read(self._getamount())
374 407
375 408 def _send(self, data, flush=False):
376 409 self._pipeo.write("%d\n" % len(data))
377 410 if data:
378 411 self._pipeo.write(data)
379 412 if flush:
380 413 self._pipeo.flush()
381 414 self._readerr()
382 415
383 416 def instance(ui, path, create):
384 417 """Create an SSH peer.
385 418
386 419 The returned object conforms to the ``wireproto.wirepeer`` interface.
387 420 """
388 421 u = util.url(path, parsequery=False, parsefragment=False)
389 422 if u.scheme != 'ssh' or not u.host or u.path is None:
390 423 raise error.RepoError(_("couldn't parse location %s") % path)
391 424
392 425 util.checksafessh(path)
393 426
394 427 if u.passwd is not None:
395 428 raise error.RepoError(_('password in URL not supported'))
396 429
397 430 sshcmd = ui.config('ui', 'ssh')
398 431 remotecmd = ui.config('ui', 'remotecmd')
399 432 sshaddenv = dict(ui.configitems('sshenv'))
400 433 sshenv = util.shellenviron(sshaddenv)
401 434 remotepath = u.path or '.'
402 435
403 436 args = util.sshargs(sshcmd, u.host, u.user, u.port)
404 437
405 438 if create:
406 439 cmd = '%s %s %s' % (sshcmd, args,
407 440 util.shellquote('%s init %s' %
408 441 (_serverquote(remotecmd), _serverquote(remotepath))))
409 442 ui.debug('running %s\n' % cmd)
410 443 res = ui.system(cmd, blockedtag='sshpeer', environ=sshenv)
411 444 if res != 0:
412 445 raise error.RepoError(_('could not create remote repo'))
413 446
414 447 proc, stdin, stdout, stderr = _makeconnection(ui, sshcmd, args, remotecmd,
415 448 remotepath, sshenv)
416 449
417 450 try:
418 451 caps = _performhandshake(ui, stdin, stdout, stderr)
419 452 except Exception:
420 453 _cleanuppipes(ui, stdout, stdin, stderr)
421 454 raise
422 455
423 456 return sshpeer(ui, path, proc, stdin, stdout, stderr, caps)
General Comments 0
You need to be logged in to leave comments. Login now